console_linux.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. // +build linux
  2. package console
  3. import (
  4. "io"
  5. "os"
  6. "sync"
  7. "golang.org/x/sys/unix"
  8. )
  9. const (
  10. maxEvents = 128
  11. )
  12. // Epoller manages multiple epoll consoles using edge-triggered epoll api so we
  13. // dont have to deal with repeated wake-up of EPOLLER or EPOLLHUP.
  14. // For more details, see:
  15. // - https://github.com/systemd/systemd/pull/4262
  16. // - https://github.com/moby/moby/issues/27202
  17. //
  18. // Example usage of Epoller and EpollConsole can be as follow:
  19. //
  20. // epoller, _ := NewEpoller()
  21. // epollConsole, _ := epoller.Add(console)
  22. // go epoller.Wait()
  23. // var (
  24. // b bytes.Buffer
  25. // wg sync.WaitGroup
  26. // )
  27. // wg.Add(1)
  28. // go func() {
  29. // io.Copy(&b, epollConsole)
  30. // wg.Done()
  31. // }()
  32. // // perform I/O on the console
  33. // epollConsole.Shutdown(epoller.CloseConsole)
  34. // wg.Wait()
  35. // epollConsole.Close()
  36. type Epoller struct {
  37. efd int
  38. mu sync.Mutex
  39. fdMapping map[int]*EpollConsole
  40. }
  41. // NewEpoller returns an instance of epoller with a valid epoll fd.
  42. func NewEpoller() (*Epoller, error) {
  43. efd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
  44. if err != nil {
  45. return nil, err
  46. }
  47. return &Epoller{
  48. efd: efd,
  49. fdMapping: make(map[int]*EpollConsole),
  50. }, nil
  51. }
  52. // Add creates a epoll console based on the provided console. The console will
  53. // be registered with EPOLLET (i.e. using edge-triggered notification) and its
  54. // file descriptor will be set to non-blocking mode. After this, user should use
  55. // the return console to perform I/O.
  56. func (e *Epoller) Add(console Console) (*EpollConsole, error) {
  57. sysfd := int(console.Fd())
  58. // Set sysfd to non-blocking mode
  59. if err := unix.SetNonblock(sysfd, true); err != nil {
  60. return nil, err
  61. }
  62. ev := unix.EpollEvent{
  63. Events: unix.EPOLLIN | unix.EPOLLOUT | unix.EPOLLRDHUP | unix.EPOLLET,
  64. Fd: int32(sysfd),
  65. }
  66. if err := unix.EpollCtl(e.efd, unix.EPOLL_CTL_ADD, sysfd, &ev); err != nil {
  67. return nil, err
  68. }
  69. ef := &EpollConsole{
  70. Console: console,
  71. sysfd: sysfd,
  72. readc: sync.NewCond(&sync.Mutex{}),
  73. writec: sync.NewCond(&sync.Mutex{}),
  74. }
  75. e.mu.Lock()
  76. e.fdMapping[sysfd] = ef
  77. e.mu.Unlock()
  78. return ef, nil
  79. }
  80. // Wait starts the loop to wait for its consoles' notifications and signal
  81. // appropriate console that it can perform I/O.
  82. func (e *Epoller) Wait() error {
  83. events := make([]unix.EpollEvent, maxEvents)
  84. for {
  85. n, err := unix.EpollWait(e.efd, events, -1)
  86. if err != nil {
  87. // EINTR: The call was interrupted by a signal handler before either
  88. // any of the requested events occurred or the timeout expired
  89. if err == unix.EINTR {
  90. continue
  91. }
  92. return err
  93. }
  94. for i := 0; i < n; i++ {
  95. ev := &events[i]
  96. // the console is ready to be read from
  97. if ev.Events&(unix.EPOLLIN|unix.EPOLLHUP|unix.EPOLLERR) != 0 {
  98. if epfile := e.getConsole(int(ev.Fd)); epfile != nil {
  99. epfile.signalRead()
  100. }
  101. }
  102. // the console is ready to be written to
  103. if ev.Events&(unix.EPOLLOUT|unix.EPOLLHUP|unix.EPOLLERR) != 0 {
  104. if epfile := e.getConsole(int(ev.Fd)); epfile != nil {
  105. epfile.signalWrite()
  106. }
  107. }
  108. }
  109. }
  110. }
  111. // Close unregister the console's file descriptor from epoll interface
  112. func (e *Epoller) CloseConsole(fd int) error {
  113. e.mu.Lock()
  114. defer e.mu.Unlock()
  115. delete(e.fdMapping, fd)
  116. return unix.EpollCtl(e.efd, unix.EPOLL_CTL_DEL, fd, &unix.EpollEvent{})
  117. }
  118. func (e *Epoller) getConsole(sysfd int) *EpollConsole {
  119. e.mu.Lock()
  120. f := e.fdMapping[sysfd]
  121. e.mu.Unlock()
  122. return f
  123. }
  124. // Close the epoll fd
  125. func (e *Epoller) Close() error {
  126. return unix.Close(e.efd)
  127. }
  128. // EpollConsole acts like a console but register its file descriptor with a
  129. // epoll fd and uses epoll API to perform I/O.
  130. type EpollConsole struct {
  131. Console
  132. readc *sync.Cond
  133. writec *sync.Cond
  134. sysfd int
  135. closed bool
  136. }
  137. // Read reads up to len(p) bytes into p. It returns the number of bytes read
  138. // (0 <= n <= len(p)) and any error encountered.
  139. //
  140. // If the console's read returns EAGAIN or EIO, we assumes that its a
  141. // temporary error because the other side went away and wait for the signal
  142. // generated by epoll event to continue.
  143. func (ec *EpollConsole) Read(p []byte) (n int, err error) {
  144. var read int
  145. ec.readc.L.Lock()
  146. defer ec.readc.L.Unlock()
  147. for {
  148. read, err = ec.Console.Read(p[n:])
  149. n += read
  150. if err != nil {
  151. var hangup bool
  152. if perr, ok := err.(*os.PathError); ok {
  153. hangup = (perr.Err == unix.EAGAIN || perr.Err == unix.EIO)
  154. } else {
  155. hangup = (err == unix.EAGAIN || err == unix.EIO)
  156. }
  157. // if the other end disappear, assume this is temporary and wait for the
  158. // signal to continue again. Unless we didnt read anything and the
  159. // console is already marked as closed then we should exit
  160. if hangup && !(n == 0 && len(p) > 0 && ec.closed) {
  161. ec.readc.Wait()
  162. continue
  163. }
  164. }
  165. break
  166. }
  167. // if we didnt read anything then return io.EOF to end gracefully
  168. if n == 0 && len(p) > 0 && err == nil {
  169. err = io.EOF
  170. }
  171. // signal for others that we finished the read
  172. ec.readc.Signal()
  173. return n, err
  174. }
  175. // Writes len(p) bytes from p to the console. It returns the number of bytes
  176. // written from p (0 <= n <= len(p)) and any error encountered that caused
  177. // the write to stop early.
  178. //
  179. // If writes to the console returns EAGAIN or EIO, we assumes that its a
  180. // temporary error because the other side went away and wait for the signal
  181. // generated by epoll event to continue.
  182. func (ec *EpollConsole) Write(p []byte) (n int, err error) {
  183. var written int
  184. ec.writec.L.Lock()
  185. defer ec.writec.L.Unlock()
  186. for {
  187. written, err = ec.Console.Write(p[n:])
  188. n += written
  189. if err != nil {
  190. var hangup bool
  191. if perr, ok := err.(*os.PathError); ok {
  192. hangup = (perr.Err == unix.EAGAIN || perr.Err == unix.EIO)
  193. } else {
  194. hangup = (err == unix.EAGAIN || err == unix.EIO)
  195. }
  196. // if the other end disappear, assume this is temporary and wait for the
  197. // signal to continue again.
  198. if hangup {
  199. ec.writec.Wait()
  200. continue
  201. }
  202. }
  203. // unrecoverable error, break the loop and return the error
  204. break
  205. }
  206. if n < len(p) && err == nil {
  207. err = io.ErrShortWrite
  208. }
  209. // signal for others that we finished the write
  210. ec.writec.Signal()
  211. return n, err
  212. }
  213. // Close closed the file descriptor and signal call waiters for this fd.
  214. // It accepts a callback which will be called with the console's fd. The
  215. // callback typically will be used to do further cleanup such as unregister the
  216. // console's fd from the epoll interface.
  217. // User should call Shutdown and wait for all I/O operation to be finished
  218. // before closing the console.
  219. func (ec *EpollConsole) Shutdown(close func(int) error) error {
  220. ec.readc.L.Lock()
  221. defer ec.readc.L.Unlock()
  222. ec.writec.L.Lock()
  223. defer ec.writec.L.Unlock()
  224. ec.readc.Broadcast()
  225. ec.writec.Broadcast()
  226. ec.closed = true
  227. return close(ec.sysfd)
  228. }
  229. // signalRead signals that the console is readable.
  230. func (ec *EpollConsole) signalRead() {
  231. ec.readc.Signal()
  232. }
  233. // signalWrite signals that the console is writable.
  234. func (ec *EpollConsole) signalWrite() {
  235. ec.writec.Signal()
  236. }