| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 | 
							- // Copyright 2016 The etcd Authors
 
- //
 
- // Licensed under the Apache License, Version 2.0 (the "License");
 
- // you may not use this file except in compliance with the License.
 
- // You may obtain a copy of the License at
 
- //
 
- //     http://www.apache.org/licenses/LICENSE-2.0
 
- //
 
- // Unless required by applicable law or agreed to in writing, software
 
- // distributed under the License is distributed on an "AS IS" BASIS,
 
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
- // See the License for the specific language governing permissions and
 
- // limitations under the License.
 
- package integration
 
- import (
 
- 	"fmt"
 
- 	"io"
 
- 	"io/ioutil"
 
- 	"net"
 
- 	"sync"
 
- 	"go.etcd.io/etcd/pkg/transport"
 
- )
 
- // bridge creates a unix socket bridge to another unix socket, making it possible
 
- // to disconnect grpc network connections without closing the logical grpc connection.
 
- type bridge struct {
 
- 	inaddr  string
 
- 	outaddr string
 
- 	l       net.Listener
 
- 	conns   map[*bridgeConn]struct{}
 
- 	stopc      chan struct{}
 
- 	pausec     chan struct{}
 
- 	blackholec chan struct{}
 
- 	wg         sync.WaitGroup
 
- 	mu sync.Mutex
 
- }
 
- func newBridge(addr string) (*bridge, error) {
 
- 	b := &bridge{
 
- 		// bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number
 
- 		inaddr:     addr + "0",
 
- 		outaddr:    addr,
 
- 		conns:      make(map[*bridgeConn]struct{}),
 
- 		stopc:      make(chan struct{}),
 
- 		pausec:     make(chan struct{}),
 
- 		blackholec: make(chan struct{}),
 
- 	}
 
- 	close(b.pausec)
 
- 	l, err := transport.NewUnixListener(b.inaddr)
 
- 	if err != nil {
 
- 		return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err)
 
- 	}
 
- 	b.l = l
 
- 	b.wg.Add(1)
 
- 	go b.serveListen()
 
- 	return b, nil
 
- }
 
- func (b *bridge) URL() string { return "unix://" + b.inaddr }
 
- func (b *bridge) Close() {
 
- 	b.l.Close()
 
- 	b.mu.Lock()
 
- 	select {
 
- 	case <-b.stopc:
 
- 	default:
 
- 		close(b.stopc)
 
- 	}
 
- 	b.mu.Unlock()
 
- 	b.wg.Wait()
 
- }
 
- func (b *bridge) Reset() {
 
- 	b.mu.Lock()
 
- 	defer b.mu.Unlock()
 
- 	for bc := range b.conns {
 
- 		bc.Close()
 
- 	}
 
- 	b.conns = make(map[*bridgeConn]struct{})
 
- }
 
- func (b *bridge) Pause() {
 
- 	b.mu.Lock()
 
- 	b.pausec = make(chan struct{})
 
- 	b.mu.Unlock()
 
- }
 
- func (b *bridge) Unpause() {
 
- 	b.mu.Lock()
 
- 	select {
 
- 	case <-b.pausec:
 
- 	default:
 
- 		close(b.pausec)
 
- 	}
 
- 	b.mu.Unlock()
 
- }
 
- func (b *bridge) serveListen() {
 
- 	defer func() {
 
- 		b.l.Close()
 
- 		b.mu.Lock()
 
- 		for bc := range b.conns {
 
- 			bc.Close()
 
- 		}
 
- 		b.mu.Unlock()
 
- 		b.wg.Done()
 
- 	}()
 
- 	for {
 
- 		inc, ierr := b.l.Accept()
 
- 		if ierr != nil {
 
- 			return
 
- 		}
 
- 		b.mu.Lock()
 
- 		pausec := b.pausec
 
- 		b.mu.Unlock()
 
- 		select {
 
- 		case <-b.stopc:
 
- 			inc.Close()
 
- 			return
 
- 		case <-pausec:
 
- 		}
 
- 		outc, oerr := net.Dial("unix", b.outaddr)
 
- 		if oerr != nil {
 
- 			inc.Close()
 
- 			return
 
- 		}
 
- 		bc := &bridgeConn{inc, outc, make(chan struct{})}
 
- 		b.wg.Add(1)
 
- 		b.mu.Lock()
 
- 		b.conns[bc] = struct{}{}
 
- 		go b.serveConn(bc)
 
- 		b.mu.Unlock()
 
- 	}
 
- }
 
- func (b *bridge) serveConn(bc *bridgeConn) {
 
- 	defer func() {
 
- 		close(bc.donec)
 
- 		bc.Close()
 
- 		b.mu.Lock()
 
- 		delete(b.conns, bc)
 
- 		b.mu.Unlock()
 
- 		b.wg.Done()
 
- 	}()
 
- 	var wg sync.WaitGroup
 
- 	wg.Add(2)
 
- 	go func() {
 
- 		b.ioCopy(bc.out, bc.in)
 
- 		bc.close()
 
- 		wg.Done()
 
- 	}()
 
- 	go func() {
 
- 		b.ioCopy(bc.in, bc.out)
 
- 		bc.close()
 
- 		wg.Done()
 
- 	}()
 
- 	wg.Wait()
 
- }
 
- type bridgeConn struct {
 
- 	in    net.Conn
 
- 	out   net.Conn
 
- 	donec chan struct{}
 
- }
 
- func (bc *bridgeConn) Close() {
 
- 	bc.close()
 
- 	<-bc.donec
 
- }
 
- func (bc *bridgeConn) close() {
 
- 	bc.in.Close()
 
- 	bc.out.Close()
 
- }
 
- func (b *bridge) Blackhole() {
 
- 	b.mu.Lock()
 
- 	close(b.blackholec)
 
- 	b.mu.Unlock()
 
- }
 
- func (b *bridge) Unblackhole() {
 
- 	b.mu.Lock()
 
- 	for bc := range b.conns {
 
- 		bc.Close()
 
- 	}
 
- 	b.conns = make(map[*bridgeConn]struct{})
 
- 	b.blackholec = make(chan struct{})
 
- 	b.mu.Unlock()
 
- }
 
- // ref. https://github.com/golang/go/blob/master/src/io/io.go copyBuffer
 
- func (b *bridge) ioCopy(dst io.Writer, src io.Reader) (err error) {
 
- 	buf := make([]byte, 32*1024)
 
- 	for {
 
- 		select {
 
- 		case <-b.blackholec:
 
- 			io.Copy(ioutil.Discard, src)
 
- 			return nil
 
- 		default:
 
- 		}
 
- 		nr, er := src.Read(buf)
 
- 		if nr > 0 {
 
- 			nw, ew := dst.Write(buf[0:nr])
 
- 			if ew != nil {
 
- 				return ew
 
- 			}
 
- 			if nr != nw {
 
- 				return io.ErrShortWrite
 
- 			}
 
- 		}
 
- 		if er != nil {
 
- 			err = er
 
- 			break
 
- 		}
 
- 	}
 
- 	return err
 
- }
 
 
  |