| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 | // Copyright 2016 The etcd Authors//// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at////     http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.// +build cluster_proxypackage integrationimport (	"sync"	"go.etcd.io/etcd/clientv3"	"go.etcd.io/etcd/clientv3/namespace"	"go.etcd.io/etcd/proxy/grpcproxy"	"go.etcd.io/etcd/proxy/grpcproxy/adapter")var (	pmu     sync.Mutex	proxies map[*clientv3.Client]grpcClientProxy = make(map[*clientv3.Client]grpcClientProxy))const proxyNamespace = "proxy-namespace"type grpcClientProxy struct {	grpc    grpcAPI	wdonec  <-chan struct{}	kvdonec <-chan struct{}	lpdonec <-chan struct{}}func toGRPC(c *clientv3.Client) grpcAPI {	pmu.Lock()	defer pmu.Unlock()	if v, ok := proxies[c]; ok {		return v.grpc	}	// test namespacing proxy	c.KV = namespace.NewKV(c.KV, proxyNamespace)	c.Watcher = namespace.NewWatcher(c.Watcher, proxyNamespace)	c.Lease = namespace.NewLease(c.Lease, proxyNamespace)	// test coalescing/caching proxy	kvp, kvpch := grpcproxy.NewKvProxy(c)	wp, wpch := grpcproxy.NewWatchProxy(c)	lp, lpch := grpcproxy.NewLeaseProxy(c)	mp := grpcproxy.NewMaintenanceProxy(c)	clp, _ := grpcproxy.NewClusterProxy(c, "", "") // without registering proxy URLs	authp := grpcproxy.NewAuthProxy(c)	lockp := grpcproxy.NewLockProxy(c)	electp := grpcproxy.NewElectionProxy(c)	grpc := grpcAPI{		adapter.ClusterServerToClusterClient(clp),		adapter.KvServerToKvClient(kvp),		adapter.LeaseServerToLeaseClient(lp),		adapter.WatchServerToWatchClient(wp),		adapter.MaintenanceServerToMaintenanceClient(mp),		adapter.AuthServerToAuthClient(authp),		adapter.LockServerToLockClient(lockp),		adapter.ElectionServerToElectionClient(electp),	}	proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch, lpdonec: lpch}	return grpc}type proxyCloser struct {	clientv3.Watcher	wdonec  <-chan struct{}	kvdonec <-chan struct{}	lclose  func()	lpdonec <-chan struct{}}func (pc *proxyCloser) Close() error {	// client ctx is canceled before calling close, so kv and lp will close out	<-pc.kvdonec	err := pc.Watcher.Close()	<-pc.wdonec	pc.lclose()	<-pc.lpdonec	return err}func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {	c, err := clientv3.New(cfg)	if err != nil {		return nil, err	}	rpc := toGRPC(c)	c.KV = clientv3.NewKVFromKVClient(rpc.KV, c)	pmu.Lock()	lc := c.Lease	c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, c, cfg.DialTimeout)	c.Watcher = &proxyCloser{		Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch, c),		wdonec:  proxies[c].wdonec,		kvdonec: proxies[c].kvdonec,		lclose:  func() { lc.Close() },		lpdonec: proxies[c].lpdonec,	}	pmu.Unlock()	return c, nil}
 |