client.go 28 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129
  1. package sftp
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "io"
  6. "os"
  7. "path"
  8. "sync/atomic"
  9. "time"
  10. "github.com/kr/fs"
  11. "github.com/pkg/errors"
  12. "golang.org/x/crypto/ssh"
  13. )
  14. // MaxPacket sets the maximum size of the payload.
  15. func MaxPacket(size int) func(*Client) error {
  16. return func(c *Client) error {
  17. if size < 1<<15 {
  18. return errors.Errorf("size must be greater or equal to 32k")
  19. }
  20. c.maxPacket = size
  21. return nil
  22. }
  23. }
  24. // NewClient creates a new SFTP client on conn, using zero or more option
  25. // functions.
  26. func NewClient(conn *ssh.Client, opts ...func(*Client) error) (*Client, error) {
  27. s, err := conn.NewSession()
  28. if err != nil {
  29. return nil, err
  30. }
  31. if err := s.RequestSubsystem("sftp"); err != nil {
  32. return nil, err
  33. }
  34. pw, err := s.StdinPipe()
  35. if err != nil {
  36. return nil, err
  37. }
  38. pr, err := s.StdoutPipe()
  39. if err != nil {
  40. return nil, err
  41. }
  42. return NewClientPipe(pr, pw, opts...)
  43. }
  44. // NewClientPipe creates a new SFTP client given a Reader and a WriteCloser.
  45. // This can be used for connecting to an SFTP server over TCP/TLS or by using
  46. // the system's ssh client program (e.g. via exec.Command).
  47. func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...func(*Client) error) (*Client, error) {
  48. sftp := &Client{
  49. clientConn: clientConn{
  50. conn: conn{
  51. Reader: rd,
  52. WriteCloser: wr,
  53. },
  54. inflight: make(map[uint32]chan<- result),
  55. },
  56. maxPacket: 1 << 15,
  57. }
  58. if err := sftp.applyOptions(opts...); err != nil {
  59. wr.Close()
  60. return nil, err
  61. }
  62. if err := sftp.sendInit(); err != nil {
  63. wr.Close()
  64. return nil, err
  65. }
  66. if err := sftp.recvVersion(); err != nil {
  67. wr.Close()
  68. return nil, err
  69. }
  70. sftp.clientConn.wg.Add(1)
  71. go sftp.loop()
  72. return sftp, nil
  73. }
  74. // Client represents an SFTP session on a *ssh.ClientConn SSH connection.
  75. // Multiple Clients can be active on a single SSH connection, and a Client
  76. // may be called concurrently from multiple Goroutines.
  77. //
  78. // Client implements the github.com/kr/fs.FileSystem interface.
  79. type Client struct {
  80. clientConn
  81. maxPacket int // max packet size read or written.
  82. nextid uint32
  83. }
  84. // Create creates the named file mode 0666 (before umask), truncating it if
  85. // it already exists. If successful, methods on the returned File can be
  86. // used for I/O; the associated file descriptor has mode O_RDWR.
  87. func (c *Client) Create(path string) (*File, error) {
  88. return c.open(path, flags(os.O_RDWR|os.O_CREATE|os.O_TRUNC))
  89. }
  90. const sftpProtocolVersion = 3 // http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02
  91. func (c *Client) sendInit() error {
  92. return c.clientConn.conn.sendPacket(sshFxInitPacket{
  93. Version: sftpProtocolVersion, // http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02
  94. })
  95. }
  96. // returns the next value of c.nextid
  97. func (c *Client) nextID() uint32 {
  98. return atomic.AddUint32(&c.nextid, 1)
  99. }
  100. func (c *Client) recvVersion() error {
  101. typ, data, err := c.recvPacket()
  102. if err != nil {
  103. return err
  104. }
  105. if typ != ssh_FXP_VERSION {
  106. return &unexpectedPacketErr{ssh_FXP_VERSION, typ}
  107. }
  108. version, _ := unmarshalUint32(data)
  109. if version != sftpProtocolVersion {
  110. return &unexpectedVersionErr{sftpProtocolVersion, version}
  111. }
  112. return nil
  113. }
  114. // Walk returns a new Walker rooted at root.
  115. func (c *Client) Walk(root string) *fs.Walker {
  116. return fs.WalkFS(root, c)
  117. }
  118. // ReadDir reads the directory named by dirname and returns a list of
  119. // directory entries.
  120. func (c *Client) ReadDir(p string) ([]os.FileInfo, error) {
  121. handle, err := c.opendir(p)
  122. if err != nil {
  123. return nil, err
  124. }
  125. defer c.close(handle) // this has to defer earlier than the lock below
  126. var attrs []os.FileInfo
  127. var done = false
  128. for !done {
  129. id := c.nextID()
  130. typ, data, err1 := c.sendPacket(sshFxpReaddirPacket{
  131. ID: id,
  132. Handle: handle,
  133. })
  134. if err1 != nil {
  135. err = err1
  136. done = true
  137. break
  138. }
  139. switch typ {
  140. case ssh_FXP_NAME:
  141. sid, data := unmarshalUint32(data)
  142. if sid != id {
  143. return nil, &unexpectedIDErr{id, sid}
  144. }
  145. count, data := unmarshalUint32(data)
  146. for i := uint32(0); i < count; i++ {
  147. var filename string
  148. filename, data = unmarshalString(data)
  149. _, data = unmarshalString(data) // discard longname
  150. var attr *FileStat
  151. attr, data = unmarshalAttrs(data)
  152. if filename == "." || filename == ".." {
  153. continue
  154. }
  155. attrs = append(attrs, fileInfoFromStat(attr, path.Base(filename)))
  156. }
  157. case ssh_FXP_STATUS:
  158. // TODO(dfc) scope warning!
  159. err = normaliseError(unmarshalStatus(id, data))
  160. done = true
  161. default:
  162. return nil, unimplementedPacketErr(typ)
  163. }
  164. }
  165. if err == io.EOF {
  166. err = nil
  167. }
  168. return attrs, err
  169. }
  170. func (c *Client) opendir(path string) (string, error) {
  171. id := c.nextID()
  172. typ, data, err := c.sendPacket(sshFxpOpendirPacket{
  173. ID: id,
  174. Path: path,
  175. })
  176. if err != nil {
  177. return "", err
  178. }
  179. switch typ {
  180. case ssh_FXP_HANDLE:
  181. sid, data := unmarshalUint32(data)
  182. if sid != id {
  183. return "", &unexpectedIDErr{id, sid}
  184. }
  185. handle, _ := unmarshalString(data)
  186. return handle, nil
  187. case ssh_FXP_STATUS:
  188. return "", unmarshalStatus(id, data)
  189. default:
  190. return "", unimplementedPacketErr(typ)
  191. }
  192. }
  193. // Stat returns a FileInfo structure describing the file specified by path 'p'.
  194. // If 'p' is a symbolic link, the returned FileInfo structure describes the referent file.
  195. func (c *Client) Stat(p string) (os.FileInfo, error) {
  196. id := c.nextID()
  197. typ, data, err := c.sendPacket(sshFxpStatPacket{
  198. ID: id,
  199. Path: p,
  200. })
  201. if err != nil {
  202. return nil, err
  203. }
  204. switch typ {
  205. case ssh_FXP_ATTRS:
  206. sid, data := unmarshalUint32(data)
  207. if sid != id {
  208. return nil, &unexpectedIDErr{id, sid}
  209. }
  210. attr, _ := unmarshalAttrs(data)
  211. return fileInfoFromStat(attr, path.Base(p)), nil
  212. case ssh_FXP_STATUS:
  213. return nil, normaliseError(unmarshalStatus(id, data))
  214. default:
  215. return nil, unimplementedPacketErr(typ)
  216. }
  217. }
  218. // Lstat returns a FileInfo structure describing the file specified by path 'p'.
  219. // If 'p' is a symbolic link, the returned FileInfo structure describes the symbolic link.
  220. func (c *Client) Lstat(p string) (os.FileInfo, error) {
  221. id := c.nextID()
  222. typ, data, err := c.sendPacket(sshFxpLstatPacket{
  223. ID: id,
  224. Path: p,
  225. })
  226. if err != nil {
  227. return nil, err
  228. }
  229. switch typ {
  230. case ssh_FXP_ATTRS:
  231. sid, data := unmarshalUint32(data)
  232. if sid != id {
  233. return nil, &unexpectedIDErr{id, sid}
  234. }
  235. attr, _ := unmarshalAttrs(data)
  236. return fileInfoFromStat(attr, path.Base(p)), nil
  237. case ssh_FXP_STATUS:
  238. return nil, normaliseError(unmarshalStatus(id, data))
  239. default:
  240. return nil, unimplementedPacketErr(typ)
  241. }
  242. }
  243. // ReadLink reads the target of a symbolic link.
  244. func (c *Client) ReadLink(p string) (string, error) {
  245. id := c.nextID()
  246. typ, data, err := c.sendPacket(sshFxpReadlinkPacket{
  247. ID: id,
  248. Path: p,
  249. })
  250. if err != nil {
  251. return "", err
  252. }
  253. switch typ {
  254. case ssh_FXP_NAME:
  255. sid, data := unmarshalUint32(data)
  256. if sid != id {
  257. return "", &unexpectedIDErr{id, sid}
  258. }
  259. count, data := unmarshalUint32(data)
  260. if count != 1 {
  261. return "", unexpectedCount(1, count)
  262. }
  263. filename, _ := unmarshalString(data) // ignore dummy attributes
  264. return filename, nil
  265. case ssh_FXP_STATUS:
  266. return "", unmarshalStatus(id, data)
  267. default:
  268. return "", unimplementedPacketErr(typ)
  269. }
  270. }
  271. // Symlink creates a symbolic link at 'newname', pointing at target 'oldname'
  272. func (c *Client) Symlink(oldname, newname string) error {
  273. id := c.nextID()
  274. typ, data, err := c.sendPacket(sshFxpSymlinkPacket{
  275. ID: id,
  276. Linkpath: newname,
  277. Targetpath: oldname,
  278. })
  279. if err != nil {
  280. return err
  281. }
  282. switch typ {
  283. case ssh_FXP_STATUS:
  284. return normaliseError(unmarshalStatus(id, data))
  285. default:
  286. return unimplementedPacketErr(typ)
  287. }
  288. }
  289. // setstat is a convience wrapper to allow for changing of various parts of the file descriptor.
  290. func (c *Client) setstat(path string, flags uint32, attrs interface{}) error {
  291. id := c.nextID()
  292. typ, data, err := c.sendPacket(sshFxpSetstatPacket{
  293. ID: id,
  294. Path: path,
  295. Flags: flags,
  296. Attrs: attrs,
  297. })
  298. if err != nil {
  299. return err
  300. }
  301. switch typ {
  302. case ssh_FXP_STATUS:
  303. return normaliseError(unmarshalStatus(id, data))
  304. default:
  305. return unimplementedPacketErr(typ)
  306. }
  307. }
  308. // Chtimes changes the access and modification times of the named file.
  309. func (c *Client) Chtimes(path string, atime time.Time, mtime time.Time) error {
  310. type times struct {
  311. Atime uint32
  312. Mtime uint32
  313. }
  314. attrs := times{uint32(atime.Unix()), uint32(mtime.Unix())}
  315. return c.setstat(path, ssh_FILEXFER_ATTR_ACMODTIME, attrs)
  316. }
  317. // Chown changes the user and group owners of the named file.
  318. func (c *Client) Chown(path string, uid, gid int) error {
  319. type owner struct {
  320. UID uint32
  321. GID uint32
  322. }
  323. attrs := owner{uint32(uid), uint32(gid)}
  324. return c.setstat(path, ssh_FILEXFER_ATTR_UIDGID, attrs)
  325. }
  326. // Chmod changes the permissions of the named file.
  327. func (c *Client) Chmod(path string, mode os.FileMode) error {
  328. return c.setstat(path, ssh_FILEXFER_ATTR_PERMISSIONS, uint32(mode))
  329. }
  330. // Truncate sets the size of the named file. Although it may be safely assumed
  331. // that if the size is less than its current size it will be truncated to fit,
  332. // the SFTP protocol does not specify what behavior the server should do when setting
  333. // size greater than the current size.
  334. func (c *Client) Truncate(path string, size int64) error {
  335. return c.setstat(path, ssh_FILEXFER_ATTR_SIZE, uint64(size))
  336. }
  337. // Open opens the named file for reading. If successful, methods on the
  338. // returned file can be used for reading; the associated file descriptor
  339. // has mode O_RDONLY.
  340. func (c *Client) Open(path string) (*File, error) {
  341. return c.open(path, flags(os.O_RDONLY))
  342. }
  343. // OpenFile is the generalized open call; most users will use Open or
  344. // Create instead. It opens the named file with specified flag (O_RDONLY
  345. // etc.). If successful, methods on the returned File can be used for I/O.
  346. func (c *Client) OpenFile(path string, f int) (*File, error) {
  347. return c.open(path, flags(f))
  348. }
  349. func (c *Client) open(path string, pflags uint32) (*File, error) {
  350. id := c.nextID()
  351. typ, data, err := c.sendPacket(sshFxpOpenPacket{
  352. ID: id,
  353. Path: path,
  354. Pflags: pflags,
  355. })
  356. if err != nil {
  357. return nil, err
  358. }
  359. switch typ {
  360. case ssh_FXP_HANDLE:
  361. sid, data := unmarshalUint32(data)
  362. if sid != id {
  363. return nil, &unexpectedIDErr{id, sid}
  364. }
  365. handle, _ := unmarshalString(data)
  366. return &File{c: c, path: path, handle: handle}, nil
  367. case ssh_FXP_STATUS:
  368. return nil, normaliseError(unmarshalStatus(id, data))
  369. default:
  370. return nil, unimplementedPacketErr(typ)
  371. }
  372. }
  373. // close closes a handle handle previously returned in the response
  374. // to SSH_FXP_OPEN or SSH_FXP_OPENDIR. The handle becomes invalid
  375. // immediately after this request has been sent.
  376. func (c *Client) close(handle string) error {
  377. id := c.nextID()
  378. typ, data, err := c.sendPacket(sshFxpClosePacket{
  379. ID: id,
  380. Handle: handle,
  381. })
  382. if err != nil {
  383. return err
  384. }
  385. switch typ {
  386. case ssh_FXP_STATUS:
  387. return normaliseError(unmarshalStatus(id, data))
  388. default:
  389. return unimplementedPacketErr(typ)
  390. }
  391. }
  392. func (c *Client) fstat(handle string) (*FileStat, error) {
  393. id := c.nextID()
  394. typ, data, err := c.sendPacket(sshFxpFstatPacket{
  395. ID: id,
  396. Handle: handle,
  397. })
  398. if err != nil {
  399. return nil, err
  400. }
  401. switch typ {
  402. case ssh_FXP_ATTRS:
  403. sid, data := unmarshalUint32(data)
  404. if sid != id {
  405. return nil, &unexpectedIDErr{id, sid}
  406. }
  407. attr, _ := unmarshalAttrs(data)
  408. return attr, nil
  409. case ssh_FXP_STATUS:
  410. return nil, unmarshalStatus(id, data)
  411. default:
  412. return nil, unimplementedPacketErr(typ)
  413. }
  414. }
  415. // StatVFS retrieves VFS statistics from a remote host.
  416. //
  417. // It implements the statvfs@openssh.com SSH_FXP_EXTENDED feature
  418. // from http://www.opensource.apple.com/source/OpenSSH/OpenSSH-175/openssh/PROTOCOL?txt.
  419. func (c *Client) StatVFS(path string) (*StatVFS, error) {
  420. // send the StatVFS packet to the server
  421. id := c.nextID()
  422. typ, data, err := c.sendPacket(sshFxpStatvfsPacket{
  423. ID: id,
  424. Path: path,
  425. })
  426. if err != nil {
  427. return nil, err
  428. }
  429. switch typ {
  430. // server responded with valid data
  431. case ssh_FXP_EXTENDED_REPLY:
  432. var response StatVFS
  433. err = binary.Read(bytes.NewReader(data), binary.BigEndian, &response)
  434. if err != nil {
  435. return nil, errors.New("can not parse reply")
  436. }
  437. return &response, nil
  438. // the resquest failed
  439. case ssh_FXP_STATUS:
  440. return nil, errors.New(fxp(ssh_FXP_STATUS).String())
  441. default:
  442. return nil, unimplementedPacketErr(typ)
  443. }
  444. }
  445. // Join joins any number of path elements into a single path, adding a
  446. // separating slash if necessary. The result is Cleaned; in particular, all
  447. // empty strings are ignored.
  448. func (c *Client) Join(elem ...string) string { return path.Join(elem...) }
  449. // Remove removes the specified file or directory. An error will be returned if no
  450. // file or directory with the specified path exists, or if the specified directory
  451. // is not empty.
  452. func (c *Client) Remove(path string) error {
  453. err := c.removeFile(path)
  454. if err, ok := err.(*StatusError); ok {
  455. switch err.Code {
  456. // some servers, *cough* osx *cough*, return EPERM, not ENODIR.
  457. // serv-u returns ssh_FX_FILE_IS_A_DIRECTORY
  458. case ssh_FX_PERMISSION_DENIED, ssh_FX_FAILURE, ssh_FX_FILE_IS_A_DIRECTORY:
  459. return c.removeDirectory(path)
  460. }
  461. }
  462. return err
  463. }
  464. func (c *Client) removeFile(path string) error {
  465. id := c.nextID()
  466. typ, data, err := c.sendPacket(sshFxpRemovePacket{
  467. ID: id,
  468. Filename: path,
  469. })
  470. if err != nil {
  471. return err
  472. }
  473. switch typ {
  474. case ssh_FXP_STATUS:
  475. return normaliseError(unmarshalStatus(id, data))
  476. default:
  477. return unimplementedPacketErr(typ)
  478. }
  479. }
  480. func (c *Client) removeDirectory(path string) error {
  481. id := c.nextID()
  482. typ, data, err := c.sendPacket(sshFxpRmdirPacket{
  483. ID: id,
  484. Path: path,
  485. })
  486. if err != nil {
  487. return err
  488. }
  489. switch typ {
  490. case ssh_FXP_STATUS:
  491. return normaliseError(unmarshalStatus(id, data))
  492. default:
  493. return unimplementedPacketErr(typ)
  494. }
  495. }
  496. // Rename renames a file.
  497. func (c *Client) Rename(oldname, newname string) error {
  498. id := c.nextID()
  499. typ, data, err := c.sendPacket(sshFxpRenamePacket{
  500. ID: id,
  501. Oldpath: oldname,
  502. Newpath: newname,
  503. })
  504. if err != nil {
  505. return err
  506. }
  507. switch typ {
  508. case ssh_FXP_STATUS:
  509. return normaliseError(unmarshalStatus(id, data))
  510. default:
  511. return unimplementedPacketErr(typ)
  512. }
  513. }
  514. func (c *Client) realpath(path string) (string, error) {
  515. id := c.nextID()
  516. typ, data, err := c.sendPacket(sshFxpRealpathPacket{
  517. ID: id,
  518. Path: path,
  519. })
  520. if err != nil {
  521. return "", err
  522. }
  523. switch typ {
  524. case ssh_FXP_NAME:
  525. sid, data := unmarshalUint32(data)
  526. if sid != id {
  527. return "", &unexpectedIDErr{id, sid}
  528. }
  529. count, data := unmarshalUint32(data)
  530. if count != 1 {
  531. return "", unexpectedCount(1, count)
  532. }
  533. filename, _ := unmarshalString(data) // ignore attributes
  534. return filename, nil
  535. case ssh_FXP_STATUS:
  536. return "", normaliseError(unmarshalStatus(id, data))
  537. default:
  538. return "", unimplementedPacketErr(typ)
  539. }
  540. }
  541. // Getwd returns the current working directory of the server. Operations
  542. // involving relative paths will be based at this location.
  543. func (c *Client) Getwd() (string, error) {
  544. return c.realpath(".")
  545. }
  546. // Mkdir creates the specified directory. An error will be returned if a file or
  547. // directory with the specified path already exists, or if the directory's
  548. // parent folder does not exist (the method cannot create complete paths).
  549. func (c *Client) Mkdir(path string) error {
  550. id := c.nextID()
  551. typ, data, err := c.sendPacket(sshFxpMkdirPacket{
  552. ID: id,
  553. Path: path,
  554. })
  555. if err != nil {
  556. return err
  557. }
  558. switch typ {
  559. case ssh_FXP_STATUS:
  560. return normaliseError(unmarshalStatus(id, data))
  561. default:
  562. return unimplementedPacketErr(typ)
  563. }
  564. }
  565. // applyOptions applies options functions to the Client.
  566. // If an error is encountered, option processing ceases.
  567. func (c *Client) applyOptions(opts ...func(*Client) error) error {
  568. for _, f := range opts {
  569. if err := f(c); err != nil {
  570. return err
  571. }
  572. }
  573. return nil
  574. }
  575. // File represents a remote file.
  576. type File struct {
  577. c *Client
  578. path string
  579. handle string
  580. offset uint64 // current offset within remote file
  581. }
  582. // Close closes the File, rendering it unusable for I/O. It returns an
  583. // error, if any.
  584. func (f *File) Close() error {
  585. return f.c.close(f.handle)
  586. }
  587. // Name returns the name of the file as presented to Open or Create.
  588. func (f *File) Name() string {
  589. return f.path
  590. }
  591. const maxConcurrentRequests = 64
  592. // Read reads up to len(b) bytes from the File. It returns the number of
  593. // bytes read and an error, if any. EOF is signaled by a zero count with
  594. // err set to io.EOF.
  595. func (f *File) Read(b []byte) (int, error) {
  596. // Split the read into multiple maxPacket sized concurrent reads
  597. // bounded by maxConcurrentRequests. This allows reads with a suitably
  598. // large buffer to transfer data at a much faster rate due to
  599. // overlapping round trip times.
  600. inFlight := 0
  601. desiredInFlight := 1
  602. offset := f.offset
  603. ch := make(chan result, 1)
  604. type inflightRead struct {
  605. b []byte
  606. offset uint64
  607. }
  608. reqs := map[uint32]inflightRead{}
  609. type offsetErr struct {
  610. offset uint64
  611. err error
  612. }
  613. var firstErr offsetErr
  614. sendReq := func(b []byte, offset uint64) {
  615. reqID := f.c.nextID()
  616. f.c.dispatchRequest(ch, sshFxpReadPacket{
  617. ID: reqID,
  618. Handle: f.handle,
  619. Offset: offset,
  620. Len: uint32(len(b)),
  621. })
  622. inFlight++
  623. reqs[reqID] = inflightRead{b: b, offset: offset}
  624. }
  625. var read int
  626. for len(b) > 0 || inFlight > 0 {
  627. for inFlight < desiredInFlight && len(b) > 0 && firstErr.err == nil {
  628. l := min(len(b), f.c.maxPacket)
  629. rb := b[:l]
  630. sendReq(rb, offset)
  631. offset += uint64(l)
  632. b = b[l:]
  633. }
  634. if inFlight == 0 {
  635. break
  636. }
  637. select {
  638. case res := <-ch:
  639. inFlight--
  640. if res.err != nil {
  641. firstErr = offsetErr{offset: 0, err: res.err}
  642. break
  643. }
  644. reqID, data := unmarshalUint32(res.data)
  645. req, ok := reqs[reqID]
  646. if !ok {
  647. firstErr = offsetErr{offset: 0, err: errors.Errorf("sid: %v not found", reqID)}
  648. break
  649. }
  650. delete(reqs, reqID)
  651. switch res.typ {
  652. case ssh_FXP_STATUS:
  653. if firstErr.err == nil || req.offset < firstErr.offset {
  654. firstErr = offsetErr{
  655. offset: req.offset,
  656. err: normaliseError(unmarshalStatus(reqID, res.data)),
  657. }
  658. break
  659. }
  660. case ssh_FXP_DATA:
  661. l, data := unmarshalUint32(data)
  662. n := copy(req.b, data[:l])
  663. read += n
  664. if n < len(req.b) {
  665. sendReq(req.b[l:], req.offset+uint64(l))
  666. }
  667. if desiredInFlight < maxConcurrentRequests {
  668. desiredInFlight++
  669. }
  670. default:
  671. firstErr = offsetErr{offset: 0, err: unimplementedPacketErr(res.typ)}
  672. break
  673. }
  674. }
  675. }
  676. // If the error is anything other than EOF, then there
  677. // may be gaps in the data copied to the buffer so it's
  678. // best to return 0 so the caller can't make any
  679. // incorrect assumptions about the state of the buffer.
  680. if firstErr.err != nil && firstErr.err != io.EOF {
  681. read = 0
  682. }
  683. f.offset += uint64(read)
  684. return read, firstErr.err
  685. }
  686. // WriteTo writes the file to w. The return value is the number of bytes
  687. // written. Any error encountered during the write is also returned.
  688. func (f *File) WriteTo(w io.Writer) (int64, error) {
  689. fi, err := f.Stat()
  690. if err != nil {
  691. return 0, err
  692. }
  693. inFlight := 0
  694. desiredInFlight := 1
  695. offset := f.offset
  696. writeOffset := offset
  697. fileSize := uint64(fi.Size())
  698. ch := make(chan result, 1)
  699. type inflightRead struct {
  700. b []byte
  701. offset uint64
  702. }
  703. reqs := map[uint32]inflightRead{}
  704. pendingWrites := map[uint64][]byte{}
  705. type offsetErr struct {
  706. offset uint64
  707. err error
  708. }
  709. var firstErr offsetErr
  710. sendReq := func(b []byte, offset uint64) {
  711. reqID := f.c.nextID()
  712. f.c.dispatchRequest(ch, sshFxpReadPacket{
  713. ID: reqID,
  714. Handle: f.handle,
  715. Offset: offset,
  716. Len: uint32(len(b)),
  717. })
  718. inFlight++
  719. reqs[reqID] = inflightRead{b: b, offset: offset}
  720. }
  721. var copied int64
  722. for firstErr.err == nil || inFlight > 0 {
  723. for inFlight < desiredInFlight && firstErr.err == nil {
  724. b := make([]byte, f.c.maxPacket)
  725. sendReq(b, offset)
  726. offset += uint64(f.c.maxPacket)
  727. if offset > fileSize {
  728. desiredInFlight = 1
  729. }
  730. }
  731. if inFlight == 0 {
  732. break
  733. }
  734. select {
  735. case res := <-ch:
  736. inFlight--
  737. if res.err != nil {
  738. firstErr = offsetErr{offset: 0, err: res.err}
  739. break
  740. }
  741. reqID, data := unmarshalUint32(res.data)
  742. req, ok := reqs[reqID]
  743. if !ok {
  744. firstErr = offsetErr{offset: 0, err: errors.Errorf("sid: %v not found", reqID)}
  745. break
  746. }
  747. delete(reqs, reqID)
  748. switch res.typ {
  749. case ssh_FXP_STATUS:
  750. if firstErr.err == nil || req.offset < firstErr.offset {
  751. firstErr = offsetErr{offset: req.offset, err: normaliseError(unmarshalStatus(reqID, res.data))}
  752. break
  753. }
  754. case ssh_FXP_DATA:
  755. l, data := unmarshalUint32(data)
  756. if req.offset == writeOffset {
  757. nbytes, err := w.Write(data)
  758. copied += int64(nbytes)
  759. if err != nil {
  760. firstErr = offsetErr{offset: req.offset + uint64(nbytes), err: err}
  761. break
  762. }
  763. if nbytes < int(l) {
  764. firstErr = offsetErr{offset: req.offset + uint64(nbytes), err: io.ErrShortWrite}
  765. break
  766. }
  767. switch {
  768. case offset > fileSize:
  769. desiredInFlight = 1
  770. case desiredInFlight < maxConcurrentRequests:
  771. desiredInFlight++
  772. }
  773. writeOffset += uint64(nbytes)
  774. for pendingData, ok := pendingWrites[writeOffset]; ok; pendingData, ok = pendingWrites[writeOffset] {
  775. nbytes, err := w.Write(pendingData)
  776. if err != nil {
  777. firstErr = offsetErr{offset: writeOffset + uint64(nbytes), err: err}
  778. break
  779. }
  780. if nbytes < len(pendingData) {
  781. firstErr = offsetErr{offset: writeOffset + uint64(nbytes), err: io.ErrShortWrite}
  782. break
  783. }
  784. writeOffset += uint64(nbytes)
  785. inFlight--
  786. }
  787. } else {
  788. // Don't write the data yet because
  789. // this response came in out of order
  790. // and we need to wait for responses
  791. // for earlier segments of the file.
  792. inFlight++ // Pending writes should still be considered inFlight.
  793. pendingWrites[req.offset] = data
  794. }
  795. default:
  796. firstErr = offsetErr{offset: 0, err: unimplementedPacketErr(res.typ)}
  797. break
  798. }
  799. }
  800. }
  801. if firstErr.err != io.EOF {
  802. return copied, firstErr.err
  803. }
  804. return copied, nil
  805. }
  806. // Stat returns the FileInfo structure describing file. If there is an
  807. // error.
  808. func (f *File) Stat() (os.FileInfo, error) {
  809. fs, err := f.c.fstat(f.handle)
  810. if err != nil {
  811. return nil, err
  812. }
  813. return fileInfoFromStat(fs, path.Base(f.path)), nil
  814. }
  815. // Write writes len(b) bytes to the File. It returns the number of bytes
  816. // written and an error, if any. Write returns a non-nil error when n !=
  817. // len(b).
  818. func (f *File) Write(b []byte) (int, error) {
  819. // Split the write into multiple maxPacket sized concurrent writes
  820. // bounded by maxConcurrentRequests. This allows writes with a suitably
  821. // large buffer to transfer data at a much faster rate due to
  822. // overlapping round trip times.
  823. inFlight := 0
  824. desiredInFlight := 1
  825. offset := f.offset
  826. ch := make(chan result, 1)
  827. var firstErr error
  828. written := len(b)
  829. for len(b) > 0 || inFlight > 0 {
  830. for inFlight < desiredInFlight && len(b) > 0 && firstErr == nil {
  831. l := min(len(b), f.c.maxPacket)
  832. rb := b[:l]
  833. f.c.dispatchRequest(ch, sshFxpWritePacket{
  834. ID: f.c.nextID(),
  835. Handle: f.handle,
  836. Offset: offset,
  837. Length: uint32(len(rb)),
  838. Data: rb,
  839. })
  840. inFlight++
  841. offset += uint64(l)
  842. b = b[l:]
  843. }
  844. if inFlight == 0 {
  845. break
  846. }
  847. select {
  848. case res := <-ch:
  849. inFlight--
  850. if res.err != nil {
  851. firstErr = res.err
  852. break
  853. }
  854. switch res.typ {
  855. case ssh_FXP_STATUS:
  856. id, _ := unmarshalUint32(res.data)
  857. err := normaliseError(unmarshalStatus(id, res.data))
  858. if err != nil && firstErr == nil {
  859. firstErr = err
  860. break
  861. }
  862. if desiredInFlight < maxConcurrentRequests {
  863. desiredInFlight++
  864. }
  865. default:
  866. firstErr = unimplementedPacketErr(res.typ)
  867. break
  868. }
  869. }
  870. }
  871. // If error is non-nil, then there may be gaps in the data written to
  872. // the file so it's best to return 0 so the caller can't make any
  873. // incorrect assumptions about the state of the file.
  874. if firstErr != nil {
  875. written = 0
  876. }
  877. f.offset += uint64(written)
  878. return written, firstErr
  879. }
  880. // ReadFrom reads data from r until EOF and writes it to the file. The return
  881. // value is the number of bytes read. Any error except io.EOF encountered
  882. // during the read is also returned.
  883. func (f *File) ReadFrom(r io.Reader) (int64, error) {
  884. inFlight := 0
  885. desiredInFlight := 1
  886. offset := f.offset
  887. ch := make(chan result, 1)
  888. var firstErr error
  889. read := int64(0)
  890. b := make([]byte, f.c.maxPacket)
  891. for inFlight > 0 || firstErr == nil {
  892. for inFlight < desiredInFlight && firstErr == nil {
  893. n, err := r.Read(b)
  894. if err != nil {
  895. firstErr = err
  896. }
  897. f.c.dispatchRequest(ch, sshFxpWritePacket{
  898. ID: f.c.nextID(),
  899. Handle: f.handle,
  900. Offset: offset,
  901. Length: uint32(n),
  902. Data: b[:n],
  903. })
  904. inFlight++
  905. offset += uint64(n)
  906. read += int64(n)
  907. }
  908. if inFlight == 0 {
  909. break
  910. }
  911. select {
  912. case res := <-ch:
  913. inFlight--
  914. if res.err != nil {
  915. firstErr = res.err
  916. break
  917. }
  918. switch res.typ {
  919. case ssh_FXP_STATUS:
  920. id, _ := unmarshalUint32(res.data)
  921. err := normaliseError(unmarshalStatus(id, res.data))
  922. if err != nil && firstErr == nil {
  923. firstErr = err
  924. break
  925. }
  926. if desiredInFlight < maxConcurrentRequests {
  927. desiredInFlight++
  928. }
  929. default:
  930. firstErr = unimplementedPacketErr(res.typ)
  931. break
  932. }
  933. }
  934. }
  935. if firstErr == io.EOF {
  936. firstErr = nil
  937. }
  938. // If error is non-nil, then there may be gaps in the data written to
  939. // the file so it's best to return 0 so the caller can't make any
  940. // incorrect assumptions about the state of the file.
  941. if firstErr != nil {
  942. read = 0
  943. }
  944. f.offset += uint64(read)
  945. return read, firstErr
  946. }
  947. // Seek implements io.Seeker by setting the client offset for the next Read or
  948. // Write. It returns the next offset read. Seeking before or after the end of
  949. // the file is undefined. Seeking relative to the end calls Stat.
  950. func (f *File) Seek(offset int64, whence int) (int64, error) {
  951. switch whence {
  952. case os.SEEK_SET:
  953. f.offset = uint64(offset)
  954. case os.SEEK_CUR:
  955. f.offset = uint64(int64(f.offset) + offset)
  956. case os.SEEK_END:
  957. fi, err := f.Stat()
  958. if err != nil {
  959. return int64(f.offset), err
  960. }
  961. f.offset = uint64(fi.Size() + offset)
  962. default:
  963. return int64(f.offset), unimplementedSeekWhence(whence)
  964. }
  965. return int64(f.offset), nil
  966. }
  967. // Chown changes the uid/gid of the current file.
  968. func (f *File) Chown(uid, gid int) error {
  969. return f.c.Chown(f.path, uid, gid)
  970. }
  971. // Chmod changes the permissions of the current file.
  972. func (f *File) Chmod(mode os.FileMode) error {
  973. return f.c.Chmod(f.path, mode)
  974. }
  975. // Truncate sets the size of the current file. Although it may be safely assumed
  976. // that if the size is less than its current size it will be truncated to fit,
  977. // the SFTP protocol does not specify what behavior the server should do when setting
  978. // size greater than the current size.
  979. func (f *File) Truncate(size int64) error {
  980. return f.c.Truncate(f.path, size)
  981. }
  982. func min(a, b int) int {
  983. if a > b {
  984. return b
  985. }
  986. return a
  987. }
  988. // normaliseError normalises an error into a more standard form that can be
  989. // checked against stdlib errors like io.EOF or os.ErrNotExist.
  990. func normaliseError(err error) error {
  991. switch err := err.(type) {
  992. case *StatusError:
  993. switch err.Code {
  994. case ssh_FX_EOF:
  995. return io.EOF
  996. case ssh_FX_NO_SUCH_FILE:
  997. return os.ErrNotExist
  998. case ssh_FX_OK:
  999. return nil
  1000. default:
  1001. return err
  1002. }
  1003. default:
  1004. return err
  1005. }
  1006. }
  1007. func unmarshalStatus(id uint32, data []byte) error {
  1008. sid, data := unmarshalUint32(data)
  1009. if sid != id {
  1010. return &unexpectedIDErr{id, sid}
  1011. }
  1012. code, data := unmarshalUint32(data)
  1013. msg, data, err := unmarshalStringSafe(data)
  1014. if err != nil {
  1015. return err
  1016. }
  1017. lang, _, _ := unmarshalStringSafe(data)
  1018. return &StatusError{
  1019. Code: code,
  1020. msg: msg,
  1021. lang: lang,
  1022. }
  1023. }
  1024. func marshalStatus(b []byte, err StatusError) []byte {
  1025. b = marshalUint32(b, err.Code)
  1026. b = marshalString(b, err.msg)
  1027. b = marshalString(b, err.lang)
  1028. return b
  1029. }
  1030. // flags converts the flags passed to OpenFile into ssh flags.
  1031. // Unsupported flags are ignored.
  1032. func flags(f int) uint32 {
  1033. var out uint32
  1034. switch f & os.O_WRONLY {
  1035. case os.O_WRONLY:
  1036. out |= ssh_FXF_WRITE
  1037. case os.O_RDONLY:
  1038. out |= ssh_FXF_READ
  1039. }
  1040. if f&os.O_RDWR == os.O_RDWR {
  1041. out |= ssh_FXF_READ | ssh_FXF_WRITE
  1042. }
  1043. if f&os.O_APPEND == os.O_APPEND {
  1044. out |= ssh_FXF_APPEND
  1045. }
  1046. if f&os.O_CREATE == os.O_CREATE {
  1047. out |= ssh_FXF_CREAT
  1048. }
  1049. if f&os.O_TRUNC == os.O_TRUNC {
  1050. out |= ssh_FXF_TRUNC
  1051. }
  1052. if f&os.O_EXCL == os.O_EXCL {
  1053. out |= ssh_FXF_EXCL
  1054. }
  1055. return out
  1056. }