datastore_file.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. /*
  2. Copyright (c) 2016-2017 VMware, Inc. All Rights Reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package object
  14. import (
  15. "bytes"
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "net/http"
  21. "os"
  22. "path"
  23. "sync"
  24. "time"
  25. "github.com/vmware/govmomi/vim25/soap"
  26. )
  27. // DatastoreFile implements io.Reader, io.Seeker and io.Closer interfaces for datastore file access.
  28. type DatastoreFile struct {
  29. d Datastore
  30. ctx context.Context
  31. name string
  32. buf io.Reader
  33. body io.ReadCloser
  34. length int64
  35. offset struct {
  36. read, seek int64
  37. }
  38. }
  39. // Open opens the named file relative to the Datastore.
  40. func (d Datastore) Open(ctx context.Context, name string) (*DatastoreFile, error) {
  41. return &DatastoreFile{
  42. d: d,
  43. name: name,
  44. length: -1,
  45. ctx: ctx,
  46. }, nil
  47. }
  48. // Read reads up to len(b) bytes from the DatastoreFile.
  49. func (f *DatastoreFile) Read(b []byte) (int, error) {
  50. if f.offset.read != f.offset.seek {
  51. // A Seek() call changed the offset, we need to issue a new GET
  52. _ = f.Close()
  53. f.offset.read = f.offset.seek
  54. } else if f.buf != nil {
  55. // f.buf + f behaves like an io.MultiReader
  56. n, err := f.buf.Read(b)
  57. if err == io.EOF {
  58. f.buf = nil // buffer has been drained
  59. }
  60. if n > 0 {
  61. return n, nil
  62. }
  63. }
  64. body, err := f.get()
  65. if err != nil {
  66. return 0, err
  67. }
  68. n, err := body.Read(b)
  69. f.offset.read += int64(n)
  70. f.offset.seek += int64(n)
  71. return n, err
  72. }
  73. // Close closes the DatastoreFile.
  74. func (f *DatastoreFile) Close() error {
  75. var err error
  76. if f.body != nil {
  77. err = f.body.Close()
  78. f.body = nil
  79. }
  80. f.buf = nil
  81. return err
  82. }
  83. // Seek sets the offset for the next Read on the DatastoreFile.
  84. func (f *DatastoreFile) Seek(offset int64, whence int) (int64, error) {
  85. switch whence {
  86. case io.SeekStart:
  87. case io.SeekCurrent:
  88. offset += f.offset.seek
  89. case io.SeekEnd:
  90. if f.length < 0 {
  91. _, err := f.Stat()
  92. if err != nil {
  93. return 0, err
  94. }
  95. }
  96. offset += f.length
  97. default:
  98. return 0, errors.New("Seek: invalid whence")
  99. }
  100. // allow negative SeekStart for initial Range request
  101. if offset < 0 {
  102. return 0, errors.New("Seek: invalid offset")
  103. }
  104. f.offset.seek = offset
  105. return offset, nil
  106. }
  107. type fileStat struct {
  108. file *DatastoreFile
  109. header http.Header
  110. }
  111. func (s *fileStat) Name() string {
  112. return path.Base(s.file.name)
  113. }
  114. func (s *fileStat) Size() int64 {
  115. return s.file.length
  116. }
  117. func (s *fileStat) Mode() os.FileMode {
  118. return 0
  119. }
  120. func (s *fileStat) ModTime() time.Time {
  121. return time.Now() // no Last-Modified
  122. }
  123. func (s *fileStat) IsDir() bool {
  124. return false
  125. }
  126. func (s *fileStat) Sys() interface{} {
  127. return s.header
  128. }
  129. func statusError(res *http.Response) error {
  130. if res.StatusCode == http.StatusNotFound {
  131. return os.ErrNotExist
  132. }
  133. return errors.New(res.Status)
  134. }
  135. // Stat returns the os.FileInfo interface describing file.
  136. func (f *DatastoreFile) Stat() (os.FileInfo, error) {
  137. // TODO: consider using Datastore.Stat() instead
  138. u, p, err := f.d.downloadTicket(f.ctx, f.name, &soap.Download{Method: "HEAD"})
  139. if err != nil {
  140. return nil, err
  141. }
  142. res, err := f.d.Client().DownloadRequest(f.ctx, u, p)
  143. if err != nil {
  144. return nil, err
  145. }
  146. if res.StatusCode != http.StatusOK {
  147. return nil, statusError(res)
  148. }
  149. f.length = res.ContentLength
  150. return &fileStat{f, res.Header}, nil
  151. }
  152. func (f *DatastoreFile) get() (io.Reader, error) {
  153. if f.body != nil {
  154. return f.body, nil
  155. }
  156. u, p, err := f.d.downloadTicket(f.ctx, f.name, nil)
  157. if err != nil {
  158. return nil, err
  159. }
  160. if f.offset.read != 0 {
  161. p.Headers = map[string]string{
  162. "Range": fmt.Sprintf("bytes=%d-", f.offset.read),
  163. }
  164. }
  165. res, err := f.d.Client().DownloadRequest(f.ctx, u, p)
  166. if err != nil {
  167. return nil, err
  168. }
  169. switch res.StatusCode {
  170. case http.StatusOK:
  171. f.length = res.ContentLength
  172. case http.StatusPartialContent:
  173. var start, end int
  174. cr := res.Header.Get("Content-Range")
  175. _, err = fmt.Sscanf(cr, "bytes %d-%d/%d", &start, &end, &f.length)
  176. if err != nil {
  177. f.length = -1
  178. }
  179. case http.StatusRequestedRangeNotSatisfiable:
  180. // ok: Read() will return io.EOF
  181. default:
  182. return nil, statusError(res)
  183. }
  184. if f.length < 0 {
  185. _ = res.Body.Close()
  186. return nil, errors.New("unable to determine file size")
  187. }
  188. f.body = res.Body
  189. return f.body, nil
  190. }
  191. func lastIndexLines(s []byte, line *int, include func(l int, m string) bool) (int64, bool) {
  192. i := len(s) - 1
  193. done := false
  194. for i > 0 {
  195. o := bytes.LastIndexByte(s[:i], '\n')
  196. if o < 0 {
  197. break
  198. }
  199. msg := string(s[o+1 : i+1])
  200. if !include(*line, msg) {
  201. done = true
  202. break
  203. } else {
  204. i = o
  205. *line++
  206. }
  207. }
  208. return int64(i), done
  209. }
  210. // Tail seeks to the position of the last N lines of the file.
  211. func (f *DatastoreFile) Tail(n int) error {
  212. return f.TailFunc(n, func(line int, _ string) bool { return n > line })
  213. }
  214. // TailFunc will seek backwards in the datastore file until it hits a line that does
  215. // not satisfy the supplied `include` function.
  216. func (f *DatastoreFile) TailFunc(lines int, include func(line int, message string) bool) error {
  217. // Read the file in reverse using bsize chunks
  218. const bsize = int64(1024 * 16)
  219. fsize, err := f.Seek(0, io.SeekEnd)
  220. if err != nil {
  221. return err
  222. }
  223. if lines == 0 {
  224. return nil
  225. }
  226. chunk := int64(-1)
  227. buf := bytes.NewBuffer(make([]byte, 0, bsize))
  228. line := 0
  229. for {
  230. var eof bool
  231. var pos int64
  232. nread := bsize
  233. offset := chunk * bsize
  234. remain := fsize + offset
  235. if remain < 0 {
  236. if pos, err = f.Seek(0, io.SeekStart); err != nil {
  237. return err
  238. }
  239. nread = bsize + remain
  240. eof = true
  241. } else {
  242. if pos, err = f.Seek(offset, io.SeekEnd); err != nil {
  243. return err
  244. }
  245. }
  246. if _, err = io.CopyN(buf, f, nread); err != nil {
  247. if err != io.EOF {
  248. return err
  249. }
  250. }
  251. b := buf.Bytes()
  252. idx, done := lastIndexLines(b, &line, include)
  253. if done {
  254. if chunk == -1 {
  255. // We found all N lines in the last chunk of the file.
  256. // The seek offset is also now at the current end of file.
  257. // Save this buffer to avoid another GET request when Read() is called.
  258. buf.Next(int(idx + 1))
  259. f.buf = buf
  260. return nil
  261. }
  262. if _, err = f.Seek(pos+idx+1, io.SeekStart); err != nil {
  263. return err
  264. }
  265. break
  266. }
  267. if eof {
  268. if remain < 0 {
  269. // We found < N lines in the entire file, so seek to the start.
  270. _, _ = f.Seek(0, io.SeekStart)
  271. }
  272. break
  273. }
  274. chunk--
  275. buf.Reset()
  276. }
  277. return nil
  278. }
  279. type followDatastoreFile struct {
  280. r *DatastoreFile
  281. c chan struct{}
  282. i time.Duration
  283. o sync.Once
  284. }
  285. // Read reads up to len(b) bytes from the DatastoreFile being followed.
  286. // This method will block until data is read, an error other than io.EOF is returned or Close() is called.
  287. func (f *followDatastoreFile) Read(p []byte) (int, error) {
  288. offset := f.r.offset.seek
  289. stop := false
  290. for {
  291. n, err := f.r.Read(p)
  292. if err != nil && err == io.EOF {
  293. _ = f.r.Close() // GET request body has been drained.
  294. if stop {
  295. return n, err
  296. }
  297. err = nil
  298. }
  299. if n > 0 {
  300. return n, err
  301. }
  302. select {
  303. case <-f.c:
  304. // Wake up and stop polling once the body has been drained
  305. stop = true
  306. case <-time.After(f.i):
  307. }
  308. info, serr := f.r.Stat()
  309. if serr != nil {
  310. // Return EOF rather than 404 if the file goes away
  311. if serr == os.ErrNotExist {
  312. _ = f.r.Close()
  313. return 0, io.EOF
  314. }
  315. return 0, serr
  316. }
  317. if info.Size() < offset {
  318. // assume file has be truncated
  319. offset, err = f.r.Seek(0, io.SeekStart)
  320. if err != nil {
  321. return 0, err
  322. }
  323. }
  324. }
  325. }
  326. // Close will stop Follow polling and close the underlying DatastoreFile.
  327. func (f *followDatastoreFile) Close() error {
  328. f.o.Do(func() { close(f.c) })
  329. return nil
  330. }
  331. // Follow returns an io.ReadCloser to stream the file contents as data is appended.
  332. func (f *DatastoreFile) Follow(interval time.Duration) io.ReadCloser {
  333. return &followDatastoreFile{
  334. r: f,
  335. c: make(chan struct{}),
  336. i: interval,
  337. }
  338. }