atomic_writer.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package util
  14. import (
  15. "bytes"
  16. "fmt"
  17. "io/ioutil"
  18. "os"
  19. "path"
  20. "path/filepath"
  21. "runtime"
  22. "strings"
  23. "time"
  24. "k8s.io/klog"
  25. "k8s.io/apimachinery/pkg/util/sets"
  26. )
  27. const (
  28. maxFileNameLength = 255
  29. maxPathLength = 4096
  30. )
  31. // AtomicWriter handles atomically projecting content for a set of files into
  32. // a target directory.
  33. //
  34. // Note:
  35. //
  36. // 1. AtomicWriter reserves the set of pathnames starting with `..`.
  37. // 2. AtomicWriter offers no concurrency guarantees and must be synchronized
  38. // by the caller.
  39. //
  40. // The visible files in this volume are symlinks to files in the writer's data
  41. // directory. Actual files are stored in a hidden timestamped directory which
  42. // is symlinked to by the data directory. The timestamped directory and
  43. // data directory symlink are created in the writer's target dir.  This scheme
  44. // allows the files to be atomically updated by changing the target of the
  45. // data directory symlink.
  46. //
  47. // Consumers of the target directory can monitor the ..data symlink using
  48. // inotify or fanotify to receive events when the content in the volume is
  49. // updated.
  50. type AtomicWriter struct {
  51. targetDir string
  52. logContext string
  53. }
  54. // FileProjection contains file Data and access Mode
  55. type FileProjection struct {
  56. Data []byte
  57. Mode int32
  58. }
  59. // NewAtomicWriter creates a new AtomicWriter configured to write to the given
  60. // target directory, or returns an error if the target directory does not exist.
  61. func NewAtomicWriter(targetDir string, logContext string) (*AtomicWriter, error) {
  62. _, err := os.Stat(targetDir)
  63. if os.IsNotExist(err) {
  64. return nil, err
  65. }
  66. return &AtomicWriter{targetDir: targetDir, logContext: logContext}, nil
  67. }
  68. const (
  69. dataDirName = "..data"
  70. newDataDirName = "..data_tmp"
  71. )
  72. // Write does an atomic projection of the given payload into the writer's target
  73. // directory. Input paths must not begin with '..'.
  74. //
  75. // The Write algorithm is:
  76. //
  77. // 1. The payload is validated; if the payload is invalid, the function returns
  78. // 2.  The current timestamped directory is detected by reading the data directory
  79. // symlink
  80. // 3. The old version of the volume is walked to determine whether any
  81. // portion of the payload was deleted and is still present on disk.
  82. // 4. The data in the current timestamped directory is compared to the projected
  83. // data to determine if an update is required.
  84. // 5.  A new timestamped dir is created
  85. // 6. The payload is written to the new timestamped directory
  86. // 7.  Symlinks and directory for new user-visible files are created (if needed).
  87. //
  88. // For example, consider the files:
  89. // <target-dir>/podName
  90. // <target-dir>/user/labels
  91. // <target-dir>/k8s/annotations
  92. //
  93. // The user visible files are symbolic links into the internal data directory:
  94. // <target-dir>/podName -> ..data/podName
  95. // <target-dir>/usr -> ..data/usr
  96. // <target-dir>/k8s -> ..data/k8s
  97. //
  98. // The data directory itself is a link to a timestamped directory with
  99. // the real data:
  100. // <target-dir>/..data -> ..2016_02_01_15_04_05.12345678/
  101. // 8.  A symlink to the new timestamped directory ..data_tmp is created that will
  102. // become the new data directory
  103. // 9.  The new data directory symlink is renamed to the data directory; rename is atomic
  104. // 10. Old paths are removed from the user-visible portion of the target directory
  105. // 11.  The previous timestamped directory is removed, if it exists
  106. func (w *AtomicWriter) Write(payload map[string]FileProjection) error {
  107. // (1)
  108. cleanPayload, err := validatePayload(payload)
  109. if err != nil {
  110. klog.Errorf("%s: invalid payload: %v", w.logContext, err)
  111. return err
  112. }
  113. // (2)
  114. dataDirPath := filepath.Join(w.targetDir, dataDirName)
  115. oldTsDir, err := os.Readlink(dataDirPath)
  116. if err != nil {
  117. if !os.IsNotExist(err) {
  118. klog.Errorf("%s: error reading link for data directory: %v", w.logContext, err)
  119. return err
  120. }
  121. // although Readlink() returns "" on err, don't be fragile by relying on it (since it's not specified in docs)
  122. // empty oldTsDir indicates that it didn't exist
  123. oldTsDir = ""
  124. }
  125. oldTsPath := filepath.Join(w.targetDir, oldTsDir)
  126. var pathsToRemove sets.String
  127. // if there was no old version, there's nothing to remove
  128. if len(oldTsDir) != 0 {
  129. // (3)
  130. pathsToRemove, err = w.pathsToRemove(cleanPayload, oldTsPath)
  131. if err != nil {
  132. klog.Errorf("%s: error determining user-visible files to remove: %v", w.logContext, err)
  133. return err
  134. }
  135. // (4)
  136. if should, err := shouldWritePayload(cleanPayload, oldTsPath); err != nil {
  137. klog.Errorf("%s: error determining whether payload should be written to disk: %v", w.logContext, err)
  138. return err
  139. } else if !should && len(pathsToRemove) == 0 {
  140. klog.V(4).Infof("%s: no update required for target directory %v", w.logContext, w.targetDir)
  141. return nil
  142. } else {
  143. klog.V(4).Infof("%s: write required for target directory %v", w.logContext, w.targetDir)
  144. }
  145. }
  146. // (5)
  147. tsDir, err := w.newTimestampDir()
  148. if err != nil {
  149. klog.V(4).Infof("%s: error creating new ts data directory: %v", w.logContext, err)
  150. return err
  151. }
  152. tsDirName := filepath.Base(tsDir)
  153. // (6)
  154. if err = w.writePayloadToDir(cleanPayload, tsDir); err != nil {
  155. klog.Errorf("%s: error writing payload to ts data directory %s: %v", w.logContext, tsDir, err)
  156. return err
  157. }
  158. klog.V(4).Infof("%s: performed write of new data to ts data directory: %s", w.logContext, tsDir)
  159. // (7)
  160. if err = w.createUserVisibleFiles(cleanPayload); err != nil {
  161. klog.Errorf("%s: error creating visible symlinks in %s: %v", w.logContext, w.targetDir, err)
  162. return err
  163. }
  164. // (8)
  165. newDataDirPath := filepath.Join(w.targetDir, newDataDirName)
  166. if err = os.Symlink(tsDirName, newDataDirPath); err != nil {
  167. os.RemoveAll(tsDir)
  168. klog.Errorf("%s: error creating symbolic link for atomic update: %v", w.logContext, err)
  169. return err
  170. }
  171. // (9)
  172. if runtime.GOOS == "windows" {
  173. os.Remove(dataDirPath)
  174. err = os.Symlink(tsDirName, dataDirPath)
  175. os.Remove(newDataDirPath)
  176. } else {
  177. err = os.Rename(newDataDirPath, dataDirPath)
  178. }
  179. if err != nil {
  180. os.Remove(newDataDirPath)
  181. os.RemoveAll(tsDir)
  182. klog.Errorf("%s: error renaming symbolic link for data directory %s: %v", w.logContext, newDataDirPath, err)
  183. return err
  184. }
  185. // (10)
  186. if err = w.removeUserVisiblePaths(pathsToRemove); err != nil {
  187. klog.Errorf("%s: error removing old visible symlinks: %v", w.logContext, err)
  188. return err
  189. }
  190. // (11)
  191. if len(oldTsDir) > 0 {
  192. if err = os.RemoveAll(oldTsPath); err != nil {
  193. klog.Errorf("%s: error removing old data directory %s: %v", w.logContext, oldTsDir, err)
  194. return err
  195. }
  196. }
  197. return nil
  198. }
  199. // validatePayload returns an error if any path in the payload returns a copy of the payload with the paths cleaned.
  200. func validatePayload(payload map[string]FileProjection) (map[string]FileProjection, error) {
  201. cleanPayload := make(map[string]FileProjection)
  202. for k, content := range payload {
  203. if err := validatePath(k); err != nil {
  204. return nil, err
  205. }
  206. cleanPayload[filepath.Clean(k)] = content
  207. }
  208. return cleanPayload, nil
  209. }
  210. // validatePath validates a single path, returning an error if the path is
  211. // invalid. paths may not:
  212. //
  213. // 1. be absolute
  214. // 2. contain '..' as an element
  215. // 3. start with '..'
  216. // 4. contain filenames larger than 255 characters
  217. // 5. be longer than 4096 characters
  218. func validatePath(targetPath string) error {
  219. // TODO: somehow unify this with the similar api validation,
  220. // validateVolumeSourcePath; the error semantics are just different enough
  221. // from this that it was time-prohibitive trying to find the right
  222. // refactoring to re-use.
  223. if targetPath == "" {
  224. return fmt.Errorf("invalid path: must not be empty: %q", targetPath)
  225. }
  226. if path.IsAbs(targetPath) {
  227. return fmt.Errorf("invalid path: must be relative path: %s", targetPath)
  228. }
  229. if len(targetPath) > maxPathLength {
  230. return fmt.Errorf("invalid path: must be less than or equal to %d characters", maxPathLength)
  231. }
  232. items := strings.Split(targetPath, string(os.PathSeparator))
  233. for _, item := range items {
  234. if item == ".." {
  235. return fmt.Errorf("invalid path: must not contain '..': %s", targetPath)
  236. }
  237. if len(item) > maxFileNameLength {
  238. return fmt.Errorf("invalid path: filenames must be less than or equal to %d characters", maxFileNameLength)
  239. }
  240. }
  241. if strings.HasPrefix(items[0], "..") && len(items[0]) > 2 {
  242. return fmt.Errorf("invalid path: must not start with '..': %s", targetPath)
  243. }
  244. return nil
  245. }
  246. // shouldWritePayload returns whether the payload should be written to disk.
  247. func shouldWritePayload(payload map[string]FileProjection, oldTsDir string) (bool, error) {
  248. for userVisiblePath, fileProjection := range payload {
  249. shouldWrite, err := shouldWriteFile(filepath.Join(oldTsDir, userVisiblePath), fileProjection.Data)
  250. if err != nil {
  251. return false, err
  252. }
  253. if shouldWrite {
  254. return true, nil
  255. }
  256. }
  257. return false, nil
  258. }
  259. // shouldWriteFile returns whether a new version of a file should be written to disk.
  260. func shouldWriteFile(path string, content []byte) (bool, error) {
  261. _, err := os.Lstat(path)
  262. if os.IsNotExist(err) {
  263. return true, nil
  264. }
  265. contentOnFs, err := ioutil.ReadFile(path)
  266. if err != nil {
  267. return false, err
  268. }
  269. return !bytes.Equal(content, contentOnFs), nil
  270. }
  271. // pathsToRemove walks the current version of the data directory and
  272. // determines which paths should be removed (if any) after the payload is
  273. // written to the target directory.
  274. func (w *AtomicWriter) pathsToRemove(payload map[string]FileProjection, oldTsDir string) (sets.String, error) {
  275. paths := sets.NewString()
  276. visitor := func(path string, info os.FileInfo, err error) error {
  277. relativePath := strings.TrimPrefix(path, oldTsDir)
  278. relativePath = strings.TrimPrefix(relativePath, string(os.PathSeparator))
  279. if relativePath == "" {
  280. return nil
  281. }
  282. paths.Insert(relativePath)
  283. return nil
  284. }
  285. err := filepath.Walk(oldTsDir, visitor)
  286. if os.IsNotExist(err) {
  287. return nil, nil
  288. } else if err != nil {
  289. return nil, err
  290. }
  291. klog.V(5).Infof("%s: current paths: %+v", w.targetDir, paths.List())
  292. newPaths := sets.NewString()
  293. for file := range payload {
  294. // add all subpaths for the payload to the set of new paths
  295. // to avoid attempting to remove non-empty dirs
  296. for subPath := file; subPath != ""; {
  297. newPaths.Insert(subPath)
  298. subPath, _ = filepath.Split(subPath)
  299. subPath = strings.TrimSuffix(subPath, string(os.PathSeparator))
  300. }
  301. }
  302. klog.V(5).Infof("%s: new paths: %+v", w.targetDir, newPaths.List())
  303. result := paths.Difference(newPaths)
  304. klog.V(5).Infof("%s: paths to remove: %+v", w.targetDir, result)
  305. return result, nil
  306. }
  307. // newTimestampDir creates a new timestamp directory
  308. func (w *AtomicWriter) newTimestampDir() (string, error) {
  309. tsDir, err := ioutil.TempDir(w.targetDir, time.Now().UTC().Format("..2006_01_02_15_04_05."))
  310. if err != nil {
  311. klog.Errorf("%s: unable to create new temp directory: %v", w.logContext, err)
  312. return "", err
  313. }
  314. // 0755 permissions are needed to allow 'group' and 'other' to recurse the
  315. // directory tree. do a chmod here to ensure that permissions are set correctly
  316. // regardless of the process' umask.
  317. err = os.Chmod(tsDir, 0755)
  318. if err != nil {
  319. klog.Errorf("%s: unable to set mode on new temp directory: %v", w.logContext, err)
  320. return "", err
  321. }
  322. return tsDir, nil
  323. }
  324. // writePayloadToDir writes the given payload to the given directory. The
  325. // directory must exist.
  326. func (w *AtomicWriter) writePayloadToDir(payload map[string]FileProjection, dir string) error {
  327. for userVisiblePath, fileProjection := range payload {
  328. content := fileProjection.Data
  329. mode := os.FileMode(fileProjection.Mode)
  330. fullPath := filepath.Join(dir, userVisiblePath)
  331. baseDir, _ := filepath.Split(fullPath)
  332. err := os.MkdirAll(baseDir, os.ModePerm)
  333. if err != nil {
  334. klog.Errorf("%s: unable to create directory %s: %v", w.logContext, baseDir, err)
  335. return err
  336. }
  337. err = ioutil.WriteFile(fullPath, content, mode)
  338. if err != nil {
  339. klog.Errorf("%s: unable to write file %s with mode %v: %v", w.logContext, fullPath, mode, err)
  340. return err
  341. }
  342. // Chmod is needed because ioutil.WriteFile() ends up calling
  343. // open(2) to create the file, so the final mode used is "mode &
  344. // ~umask". But we want to make sure the specified mode is used
  345. // in the file no matter what the umask is.
  346. err = os.Chmod(fullPath, mode)
  347. if err != nil {
  348. klog.Errorf("%s: unable to write file %s with mode %v: %v", w.logContext, fullPath, mode, err)
  349. }
  350. }
  351. return nil
  352. }
  353. // createUserVisibleFiles creates the relative symlinks for all the
  354. // files configured in the payload. If the directory in a file path does not
  355. // exist, it is created.
  356. //
  357. // Viz:
  358. // For files: "bar", "foo/bar", "baz/bar", "foo/baz/blah"
  359. // the following symlinks are created:
  360. // bar -> ..data/bar
  361. // foo -> ..data/foo
  362. // baz -> ..data/baz
  363. func (w *AtomicWriter) createUserVisibleFiles(payload map[string]FileProjection) error {
  364. for userVisiblePath := range payload {
  365. slashpos := strings.Index(userVisiblePath, string(os.PathSeparator))
  366. if slashpos == -1 {
  367. slashpos = len(userVisiblePath)
  368. }
  369. linkname := userVisiblePath[:slashpos]
  370. _, err := os.Readlink(filepath.Join(w.targetDir, linkname))
  371. if err != nil && os.IsNotExist(err) {
  372. // The link into the data directory for this path doesn't exist; create it
  373. visibleFile := filepath.Join(w.targetDir, linkname)
  374. dataDirFile := filepath.Join(dataDirName, linkname)
  375. err = os.Symlink(dataDirFile, visibleFile)
  376. if err != nil {
  377. return err
  378. }
  379. }
  380. }
  381. return nil
  382. }
  383. // removeUserVisiblePaths removes the set of paths from the user-visible
  384. // portion of the writer's target directory.
  385. func (w *AtomicWriter) removeUserVisiblePaths(paths sets.String) error {
  386. ps := string(os.PathSeparator)
  387. var lasterr error
  388. for p := range paths {
  389. // only remove symlinks from the volume root directory (i.e. items that don't contain '/')
  390. if strings.Contains(p, ps) {
  391. continue
  392. }
  393. if err := os.Remove(filepath.Join(w.targetDir, p)); err != nil {
  394. klog.Errorf("%s: error pruning old user-visible path %s: %v", w.logContext, p, err)
  395. lasterr = err
  396. }
  397. }
  398. return lasterr
  399. }