123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- // +build linux
- package console
- import (
- "io"
- "os"
- "sync"
- "golang.org/x/sys/unix"
- )
- const (
- maxEvents = 128
- )
- // Epoller manages multiple epoll consoles using edge-triggered epoll api so we
- // dont have to deal with repeated wake-up of EPOLLER or EPOLLHUP.
- // For more details, see:
- // - https://github.com/systemd/systemd/pull/4262
- // - https://github.com/moby/moby/issues/27202
- //
- // Example usage of Epoller and EpollConsole can be as follow:
- //
- // epoller, _ := NewEpoller()
- // epollConsole, _ := epoller.Add(console)
- // go epoller.Wait()
- // var (
- // b bytes.Buffer
- // wg sync.WaitGroup
- // )
- // wg.Add(1)
- // go func() {
- // io.Copy(&b, epollConsole)
- // wg.Done()
- // }()
- // // perform I/O on the console
- // epollConsole.Shutdown(epoller.CloseConsole)
- // wg.Wait()
- // epollConsole.Close()
- type Epoller struct {
- efd int
- mu sync.Mutex
- fdMapping map[int]*EpollConsole
- }
- // NewEpoller returns an instance of epoller with a valid epoll fd.
- func NewEpoller() (*Epoller, error) {
- efd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
- if err != nil {
- return nil, err
- }
- return &Epoller{
- efd: efd,
- fdMapping: make(map[int]*EpollConsole),
- }, nil
- }
- // Add creates a epoll console based on the provided console. The console will
- // be registered with EPOLLET (i.e. using edge-triggered notification) and its
- // file descriptor will be set to non-blocking mode. After this, user should use
- // the return console to perform I/O.
- func (e *Epoller) Add(console Console) (*EpollConsole, error) {
- sysfd := int(console.Fd())
- // Set sysfd to non-blocking mode
- if err := unix.SetNonblock(sysfd, true); err != nil {
- return nil, err
- }
- ev := unix.EpollEvent{
- Events: unix.EPOLLIN | unix.EPOLLOUT | unix.EPOLLRDHUP | unix.EPOLLET,
- Fd: int32(sysfd),
- }
- if err := unix.EpollCtl(e.efd, unix.EPOLL_CTL_ADD, sysfd, &ev); err != nil {
- return nil, err
- }
- ef := &EpollConsole{
- Console: console,
- sysfd: sysfd,
- readc: sync.NewCond(&sync.Mutex{}),
- writec: sync.NewCond(&sync.Mutex{}),
- }
- e.mu.Lock()
- e.fdMapping[sysfd] = ef
- e.mu.Unlock()
- return ef, nil
- }
- // Wait starts the loop to wait for its consoles' notifications and signal
- // appropriate console that it can perform I/O.
- func (e *Epoller) Wait() error {
- events := make([]unix.EpollEvent, maxEvents)
- for {
- n, err := unix.EpollWait(e.efd, events, -1)
- if err != nil {
- // EINTR: The call was interrupted by a signal handler before either
- // any of the requested events occurred or the timeout expired
- if err == unix.EINTR {
- continue
- }
- return err
- }
- for i := 0; i < n; i++ {
- ev := &events[i]
- // the console is ready to be read from
- if ev.Events&(unix.EPOLLIN|unix.EPOLLHUP|unix.EPOLLERR) != 0 {
- if epfile := e.getConsole(int(ev.Fd)); epfile != nil {
- epfile.signalRead()
- }
- }
- // the console is ready to be written to
- if ev.Events&(unix.EPOLLOUT|unix.EPOLLHUP|unix.EPOLLERR) != 0 {
- if epfile := e.getConsole(int(ev.Fd)); epfile != nil {
- epfile.signalWrite()
- }
- }
- }
- }
- }
- // Close unregister the console's file descriptor from epoll interface
- func (e *Epoller) CloseConsole(fd int) error {
- e.mu.Lock()
- defer e.mu.Unlock()
- delete(e.fdMapping, fd)
- return unix.EpollCtl(e.efd, unix.EPOLL_CTL_DEL, fd, &unix.EpollEvent{})
- }
- func (e *Epoller) getConsole(sysfd int) *EpollConsole {
- e.mu.Lock()
- f := e.fdMapping[sysfd]
- e.mu.Unlock()
- return f
- }
- // Close the epoll fd
- func (e *Epoller) Close() error {
- return unix.Close(e.efd)
- }
- // EpollConsole acts like a console but register its file descriptor with a
- // epoll fd and uses epoll API to perform I/O.
- type EpollConsole struct {
- Console
- readc *sync.Cond
- writec *sync.Cond
- sysfd int
- closed bool
- }
- // Read reads up to len(p) bytes into p. It returns the number of bytes read
- // (0 <= n <= len(p)) and any error encountered.
- //
- // If the console's read returns EAGAIN or EIO, we assumes that its a
- // temporary error because the other side went away and wait for the signal
- // generated by epoll event to continue.
- func (ec *EpollConsole) Read(p []byte) (n int, err error) {
- var read int
- ec.readc.L.Lock()
- defer ec.readc.L.Unlock()
- for {
- read, err = ec.Console.Read(p[n:])
- n += read
- if err != nil {
- var hangup bool
- if perr, ok := err.(*os.PathError); ok {
- hangup = (perr.Err == unix.EAGAIN || perr.Err == unix.EIO)
- } else {
- hangup = (err == unix.EAGAIN || err == unix.EIO)
- }
- // if the other end disappear, assume this is temporary and wait for the
- // signal to continue again. Unless we didnt read anything and the
- // console is already marked as closed then we should exit
- if hangup && !(n == 0 && len(p) > 0 && ec.closed) {
- ec.readc.Wait()
- continue
- }
- }
- break
- }
- // if we didnt read anything then return io.EOF to end gracefully
- if n == 0 && len(p) > 0 && err == nil {
- err = io.EOF
- }
- // signal for others that we finished the read
- ec.readc.Signal()
- return n, err
- }
- // Writes len(p) bytes from p to the console. It returns the number of bytes
- // written from p (0 <= n <= len(p)) and any error encountered that caused
- // the write to stop early.
- //
- // If writes to the console returns EAGAIN or EIO, we assumes that its a
- // temporary error because the other side went away and wait for the signal
- // generated by epoll event to continue.
- func (ec *EpollConsole) Write(p []byte) (n int, err error) {
- var written int
- ec.writec.L.Lock()
- defer ec.writec.L.Unlock()
- for {
- written, err = ec.Console.Write(p[n:])
- n += written
- if err != nil {
- var hangup bool
- if perr, ok := err.(*os.PathError); ok {
- hangup = (perr.Err == unix.EAGAIN || perr.Err == unix.EIO)
- } else {
- hangup = (err == unix.EAGAIN || err == unix.EIO)
- }
- // if the other end disappear, assume this is temporary and wait for the
- // signal to continue again.
- if hangup {
- ec.writec.Wait()
- continue
- }
- }
- // unrecoverable error, break the loop and return the error
- break
- }
- if n < len(p) && err == nil {
- err = io.ErrShortWrite
- }
- // signal for others that we finished the write
- ec.writec.Signal()
- return n, err
- }
- // Close closed the file descriptor and signal call waiters for this fd.
- // It accepts a callback which will be called with the console's fd. The
- // callback typically will be used to do further cleanup such as unregister the
- // console's fd from the epoll interface.
- // User should call Shutdown and wait for all I/O operation to be finished
- // before closing the console.
- func (ec *EpollConsole) Shutdown(close func(int) error) error {
- ec.readc.L.Lock()
- defer ec.readc.L.Unlock()
- ec.writec.L.Lock()
- defer ec.writec.L.Unlock()
- ec.readc.Broadcast()
- ec.writec.Broadcast()
- ec.closed = true
- return close(ec.sysfd)
- }
- // signalRead signals that the console is readable.
- func (ec *EpollConsole) signalRead() {
- ec.readc.Signal()
- }
- // signalWrite signals that the console is writable.
- func (ec *EpollConsole) signalWrite() {
- ec.writec.Signal()
- }
|