pipe.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. // +build windows
  2. package winio
  3. import (
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "os"
  10. "runtime"
  11. "syscall"
  12. "time"
  13. "unsafe"
  14. )
  15. //sys connectNamedPipe(pipe syscall.Handle, o *syscall.Overlapped) (err error) = ConnectNamedPipe
  16. //sys createNamedPipe(name string, flags uint32, pipeMode uint32, maxInstances uint32, outSize uint32, inSize uint32, defaultTimeout uint32, sa *syscall.SecurityAttributes) (handle syscall.Handle, err error) [failretval==syscall.InvalidHandle] = CreateNamedPipeW
  17. //sys createFile(name string, access uint32, mode uint32, sa *syscall.SecurityAttributes, createmode uint32, attrs uint32, templatefile syscall.Handle) (handle syscall.Handle, err error) [failretval==syscall.InvalidHandle] = CreateFileW
  18. //sys getNamedPipeInfo(pipe syscall.Handle, flags *uint32, outSize *uint32, inSize *uint32, maxInstances *uint32) (err error) = GetNamedPipeInfo
  19. //sys getNamedPipeHandleState(pipe syscall.Handle, state *uint32, curInstances *uint32, maxCollectionCount *uint32, collectDataTimeout *uint32, userName *uint16, maxUserNameSize uint32) (err error) = GetNamedPipeHandleStateW
  20. //sys localAlloc(uFlags uint32, length uint32) (ptr uintptr) = LocalAlloc
  21. //sys ntCreateNamedPipeFile(pipe *syscall.Handle, access uint32, oa *objectAttributes, iosb *ioStatusBlock, share uint32, disposition uint32, options uint32, typ uint32, readMode uint32, completionMode uint32, maxInstances uint32, inboundQuota uint32, outputQuota uint32, timeout *int64) (status ntstatus) = ntdll.NtCreateNamedPipeFile
  22. //sys rtlNtStatusToDosError(status ntstatus) (winerr error) = ntdll.RtlNtStatusToDosErrorNoTeb
  23. //sys rtlDosPathNameToNtPathName(name *uint16, ntName *unicodeString, filePart uintptr, reserved uintptr) (status ntstatus) = ntdll.RtlDosPathNameToNtPathName_U
  24. //sys rtlDefaultNpAcl(dacl *uintptr) (status ntstatus) = ntdll.RtlDefaultNpAcl
  25. type ioStatusBlock struct {
  26. Status, Information uintptr
  27. }
  28. type objectAttributes struct {
  29. Length uintptr
  30. RootDirectory uintptr
  31. ObjectName *unicodeString
  32. Attributes uintptr
  33. SecurityDescriptor *securityDescriptor
  34. SecurityQoS uintptr
  35. }
  36. type unicodeString struct {
  37. Length uint16
  38. MaximumLength uint16
  39. Buffer uintptr
  40. }
  41. type securityDescriptor struct {
  42. Revision byte
  43. Sbz1 byte
  44. Control uint16
  45. Owner uintptr
  46. Group uintptr
  47. Sacl uintptr
  48. Dacl uintptr
  49. }
  50. type ntstatus int32
  51. func (status ntstatus) Err() error {
  52. if status >= 0 {
  53. return nil
  54. }
  55. return rtlNtStatusToDosError(status)
  56. }
  57. const (
  58. cERROR_PIPE_BUSY = syscall.Errno(231)
  59. cERROR_NO_DATA = syscall.Errno(232)
  60. cERROR_PIPE_CONNECTED = syscall.Errno(535)
  61. cERROR_SEM_TIMEOUT = syscall.Errno(121)
  62. cSECURITY_SQOS_PRESENT = 0x100000
  63. cSECURITY_ANONYMOUS = 0
  64. cPIPE_TYPE_MESSAGE = 4
  65. cPIPE_READMODE_MESSAGE = 2
  66. cFILE_OPEN = 1
  67. cFILE_CREATE = 2
  68. cFILE_PIPE_MESSAGE_TYPE = 1
  69. cFILE_PIPE_REJECT_REMOTE_CLIENTS = 2
  70. cSE_DACL_PRESENT = 4
  71. )
  72. var (
  73. // ErrPipeListenerClosed is returned for pipe operations on listeners that have been closed.
  74. // This error should match net.errClosing since docker takes a dependency on its text.
  75. ErrPipeListenerClosed = errors.New("use of closed network connection")
  76. errPipeWriteClosed = errors.New("pipe has been closed for write")
  77. )
  78. type win32Pipe struct {
  79. *win32File
  80. path string
  81. }
  82. type win32MessageBytePipe struct {
  83. win32Pipe
  84. writeClosed bool
  85. readEOF bool
  86. }
  87. type pipeAddress string
  88. func (f *win32Pipe) LocalAddr() net.Addr {
  89. return pipeAddress(f.path)
  90. }
  91. func (f *win32Pipe) RemoteAddr() net.Addr {
  92. return pipeAddress(f.path)
  93. }
  94. func (f *win32Pipe) SetDeadline(t time.Time) error {
  95. f.SetReadDeadline(t)
  96. f.SetWriteDeadline(t)
  97. return nil
  98. }
  99. // CloseWrite closes the write side of a message pipe in byte mode.
  100. func (f *win32MessageBytePipe) CloseWrite() error {
  101. if f.writeClosed {
  102. return errPipeWriteClosed
  103. }
  104. err := f.win32File.Flush()
  105. if err != nil {
  106. return err
  107. }
  108. _, err = f.win32File.Write(nil)
  109. if err != nil {
  110. return err
  111. }
  112. f.writeClosed = true
  113. return nil
  114. }
  115. // Write writes bytes to a message pipe in byte mode. Zero-byte writes are ignored, since
  116. // they are used to implement CloseWrite().
  117. func (f *win32MessageBytePipe) Write(b []byte) (int, error) {
  118. if f.writeClosed {
  119. return 0, errPipeWriteClosed
  120. }
  121. if len(b) == 0 {
  122. return 0, nil
  123. }
  124. return f.win32File.Write(b)
  125. }
  126. // Read reads bytes from a message pipe in byte mode. A read of a zero-byte message on a message
  127. // mode pipe will return io.EOF, as will all subsequent reads.
  128. func (f *win32MessageBytePipe) Read(b []byte) (int, error) {
  129. if f.readEOF {
  130. return 0, io.EOF
  131. }
  132. n, err := f.win32File.Read(b)
  133. if err == io.EOF {
  134. // If this was the result of a zero-byte read, then
  135. // it is possible that the read was due to a zero-size
  136. // message. Since we are simulating CloseWrite with a
  137. // zero-byte message, ensure that all future Read() calls
  138. // also return EOF.
  139. f.readEOF = true
  140. } else if err == syscall.ERROR_MORE_DATA {
  141. // ERROR_MORE_DATA indicates that the pipe's read mode is message mode
  142. // and the message still has more bytes. Treat this as a success, since
  143. // this package presents all named pipes as byte streams.
  144. err = nil
  145. }
  146. return n, err
  147. }
  148. func (s pipeAddress) Network() string {
  149. return "pipe"
  150. }
  151. func (s pipeAddress) String() string {
  152. return string(s)
  153. }
  154. // tryDialPipe attempts to dial the pipe at `path` until `ctx` cancellation or timeout.
  155. func tryDialPipe(ctx context.Context, path *string) (syscall.Handle, error) {
  156. for {
  157. select {
  158. case <-ctx.Done():
  159. return syscall.Handle(0), ctx.Err()
  160. default:
  161. h, err := createFile(*path, syscall.GENERIC_READ|syscall.GENERIC_WRITE, 0, nil, syscall.OPEN_EXISTING, syscall.FILE_FLAG_OVERLAPPED|cSECURITY_SQOS_PRESENT|cSECURITY_ANONYMOUS, 0)
  162. if err == nil {
  163. return h, nil
  164. }
  165. if err != cERROR_PIPE_BUSY {
  166. return h, &os.PathError{Err: err, Op: "open", Path: *path}
  167. }
  168. // Wait 10 msec and try again. This is a rather simplistic
  169. // view, as we always try each 10 milliseconds.
  170. time.Sleep(time.Millisecond * 10)
  171. }
  172. }
  173. }
  174. // DialPipe connects to a named pipe by path, timing out if the connection
  175. // takes longer than the specified duration. If timeout is nil, then we use
  176. // a default timeout of 2 seconds. (We do not use WaitNamedPipe.)
  177. func DialPipe(path string, timeout *time.Duration) (net.Conn, error) {
  178. var absTimeout time.Time
  179. if timeout != nil {
  180. absTimeout = time.Now().Add(*timeout)
  181. } else {
  182. absTimeout = time.Now().Add(time.Second * 2)
  183. }
  184. ctx, _ := context.WithDeadline(context.Background(), absTimeout)
  185. conn, err := DialPipeContext(ctx, path)
  186. if err == context.DeadlineExceeded {
  187. return nil, ErrTimeout
  188. }
  189. return conn, err
  190. }
  191. // DialPipeContext attempts to connect to a named pipe by `path` until `ctx`
  192. // cancellation or timeout.
  193. func DialPipeContext(ctx context.Context, path string) (net.Conn, error) {
  194. var err error
  195. var h syscall.Handle
  196. h, err = tryDialPipe(ctx, &path)
  197. if err != nil {
  198. return nil, err
  199. }
  200. var flags uint32
  201. err = getNamedPipeInfo(h, &flags, nil, nil, nil)
  202. if err != nil {
  203. return nil, err
  204. }
  205. f, err := makeWin32File(h)
  206. if err != nil {
  207. syscall.Close(h)
  208. return nil, err
  209. }
  210. // If the pipe is in message mode, return a message byte pipe, which
  211. // supports CloseWrite().
  212. if flags&cPIPE_TYPE_MESSAGE != 0 {
  213. return &win32MessageBytePipe{
  214. win32Pipe: win32Pipe{win32File: f, path: path},
  215. }, nil
  216. }
  217. return &win32Pipe{win32File: f, path: path}, nil
  218. }
  219. type acceptResponse struct {
  220. f *win32File
  221. err error
  222. }
  223. type win32PipeListener struct {
  224. firstHandle syscall.Handle
  225. path string
  226. config PipeConfig
  227. acceptCh chan (chan acceptResponse)
  228. closeCh chan int
  229. doneCh chan int
  230. }
  231. func makeServerPipeHandle(path string, sd []byte, c *PipeConfig, first bool) (syscall.Handle, error) {
  232. path16, err := syscall.UTF16FromString(path)
  233. if err != nil {
  234. return 0, &os.PathError{Op: "open", Path: path, Err: err}
  235. }
  236. var oa objectAttributes
  237. oa.Length = unsafe.Sizeof(oa)
  238. var ntPath unicodeString
  239. if err := rtlDosPathNameToNtPathName(&path16[0], &ntPath, 0, 0).Err(); err != nil {
  240. return 0, &os.PathError{Op: "open", Path: path, Err: err}
  241. }
  242. defer localFree(ntPath.Buffer)
  243. oa.ObjectName = &ntPath
  244. // The security descriptor is only needed for the first pipe.
  245. if first {
  246. if sd != nil {
  247. len := uint32(len(sd))
  248. sdb := localAlloc(0, len)
  249. defer localFree(sdb)
  250. copy((*[0xffff]byte)(unsafe.Pointer(sdb))[:], sd)
  251. oa.SecurityDescriptor = (*securityDescriptor)(unsafe.Pointer(sdb))
  252. } else {
  253. // Construct the default named pipe security descriptor.
  254. var dacl uintptr
  255. if err := rtlDefaultNpAcl(&dacl).Err(); err != nil {
  256. return 0, fmt.Errorf("getting default named pipe ACL: %s", err)
  257. }
  258. defer localFree(dacl)
  259. sdb := &securityDescriptor{
  260. Revision: 1,
  261. Control: cSE_DACL_PRESENT,
  262. Dacl: dacl,
  263. }
  264. oa.SecurityDescriptor = sdb
  265. }
  266. }
  267. typ := uint32(cFILE_PIPE_REJECT_REMOTE_CLIENTS)
  268. if c.MessageMode {
  269. typ |= cFILE_PIPE_MESSAGE_TYPE
  270. }
  271. disposition := uint32(cFILE_OPEN)
  272. access := uint32(syscall.GENERIC_READ | syscall.GENERIC_WRITE | syscall.SYNCHRONIZE)
  273. if first {
  274. disposition = cFILE_CREATE
  275. // By not asking for read or write access, the named pipe file system
  276. // will put this pipe into an initially disconnected state, blocking
  277. // client connections until the next call with first == false.
  278. access = syscall.SYNCHRONIZE
  279. }
  280. timeout := int64(-50 * 10000) // 50ms
  281. var (
  282. h syscall.Handle
  283. iosb ioStatusBlock
  284. )
  285. err = ntCreateNamedPipeFile(&h, access, &oa, &iosb, syscall.FILE_SHARE_READ|syscall.FILE_SHARE_WRITE, disposition, 0, typ, 0, 0, 0xffffffff, uint32(c.InputBufferSize), uint32(c.OutputBufferSize), &timeout).Err()
  286. if err != nil {
  287. return 0, &os.PathError{Op: "open", Path: path, Err: err}
  288. }
  289. runtime.KeepAlive(ntPath)
  290. return h, nil
  291. }
  292. func (l *win32PipeListener) makeServerPipe() (*win32File, error) {
  293. h, err := makeServerPipeHandle(l.path, nil, &l.config, false)
  294. if err != nil {
  295. return nil, err
  296. }
  297. f, err := makeWin32File(h)
  298. if err != nil {
  299. syscall.Close(h)
  300. return nil, err
  301. }
  302. return f, nil
  303. }
  304. func (l *win32PipeListener) makeConnectedServerPipe() (*win32File, error) {
  305. p, err := l.makeServerPipe()
  306. if err != nil {
  307. return nil, err
  308. }
  309. // Wait for the client to connect.
  310. ch := make(chan error)
  311. go func(p *win32File) {
  312. ch <- connectPipe(p)
  313. }(p)
  314. select {
  315. case err = <-ch:
  316. if err != nil {
  317. p.Close()
  318. p = nil
  319. }
  320. case <-l.closeCh:
  321. // Abort the connect request by closing the handle.
  322. p.Close()
  323. p = nil
  324. err = <-ch
  325. if err == nil || err == ErrFileClosed {
  326. err = ErrPipeListenerClosed
  327. }
  328. }
  329. return p, err
  330. }
  331. func (l *win32PipeListener) listenerRoutine() {
  332. closed := false
  333. for !closed {
  334. select {
  335. case <-l.closeCh:
  336. closed = true
  337. case responseCh := <-l.acceptCh:
  338. var (
  339. p *win32File
  340. err error
  341. )
  342. for {
  343. p, err = l.makeConnectedServerPipe()
  344. // If the connection was immediately closed by the client, try
  345. // again.
  346. if err != cERROR_NO_DATA {
  347. break
  348. }
  349. }
  350. responseCh <- acceptResponse{p, err}
  351. closed = err == ErrPipeListenerClosed
  352. }
  353. }
  354. syscall.Close(l.firstHandle)
  355. l.firstHandle = 0
  356. // Notify Close() and Accept() callers that the handle has been closed.
  357. close(l.doneCh)
  358. }
  359. // PipeConfig contain configuration for the pipe listener.
  360. type PipeConfig struct {
  361. // SecurityDescriptor contains a Windows security descriptor in SDDL format.
  362. SecurityDescriptor string
  363. // MessageMode determines whether the pipe is in byte or message mode. In either
  364. // case the pipe is read in byte mode by default. The only practical difference in
  365. // this implementation is that CloseWrite() is only supported for message mode pipes;
  366. // CloseWrite() is implemented as a zero-byte write, but zero-byte writes are only
  367. // transferred to the reader (and returned as io.EOF in this implementation)
  368. // when the pipe is in message mode.
  369. MessageMode bool
  370. // InputBufferSize specifies the size the input buffer, in bytes.
  371. InputBufferSize int32
  372. // OutputBufferSize specifies the size the input buffer, in bytes.
  373. OutputBufferSize int32
  374. }
  375. // ListenPipe creates a listener on a Windows named pipe path, e.g. \\.\pipe\mypipe.
  376. // The pipe must not already exist.
  377. func ListenPipe(path string, c *PipeConfig) (net.Listener, error) {
  378. var (
  379. sd []byte
  380. err error
  381. )
  382. if c == nil {
  383. c = &PipeConfig{}
  384. }
  385. if c.SecurityDescriptor != "" {
  386. sd, err = SddlToSecurityDescriptor(c.SecurityDescriptor)
  387. if err != nil {
  388. return nil, err
  389. }
  390. }
  391. h, err := makeServerPipeHandle(path, sd, c, true)
  392. if err != nil {
  393. return nil, err
  394. }
  395. l := &win32PipeListener{
  396. firstHandle: h,
  397. path: path,
  398. config: *c,
  399. acceptCh: make(chan (chan acceptResponse)),
  400. closeCh: make(chan int),
  401. doneCh: make(chan int),
  402. }
  403. go l.listenerRoutine()
  404. return l, nil
  405. }
  406. func connectPipe(p *win32File) error {
  407. c, err := p.prepareIo()
  408. if err != nil {
  409. return err
  410. }
  411. defer p.wg.Done()
  412. err = connectNamedPipe(p.handle, &c.o)
  413. _, err = p.asyncIo(c, nil, 0, err)
  414. if err != nil && err != cERROR_PIPE_CONNECTED {
  415. return err
  416. }
  417. return nil
  418. }
  419. func (l *win32PipeListener) Accept() (net.Conn, error) {
  420. ch := make(chan acceptResponse)
  421. select {
  422. case l.acceptCh <- ch:
  423. response := <-ch
  424. err := response.err
  425. if err != nil {
  426. return nil, err
  427. }
  428. if l.config.MessageMode {
  429. return &win32MessageBytePipe{
  430. win32Pipe: win32Pipe{win32File: response.f, path: l.path},
  431. }, nil
  432. }
  433. return &win32Pipe{win32File: response.f, path: l.path}, nil
  434. case <-l.doneCh:
  435. return nil, ErrPipeListenerClosed
  436. }
  437. }
  438. func (l *win32PipeListener) Close() error {
  439. select {
  440. case l.closeCh <- 1:
  441. <-l.doneCh
  442. case <-l.doneCh:
  443. }
  444. return nil
  445. }
  446. func (l *win32PipeListener) Addr() net.Addr {
  447. return pipeAddress(l.path)
  448. }