interceptor.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package v3rpc
  15. import (
  16. "context"
  17. "sync"
  18. "time"
  19. "go.etcd.io/etcd/etcdserver"
  20. "go.etcd.io/etcd/etcdserver/api"
  21. "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
  22. "go.etcd.io/etcd/pkg/types"
  23. "go.etcd.io/etcd/raft"
  24. "github.com/coreos/pkg/capnslog"
  25. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  26. "go.uber.org/zap"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/metadata"
  29. "google.golang.org/grpc/peer"
  30. )
  31. const (
  32. maxNoLeaderCnt = 3
  33. )
  34. type streamsMap struct {
  35. mu sync.Mutex
  36. streams map[grpc.ServerStream]struct{}
  37. }
  38. func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
  39. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  40. if !api.IsCapabilityEnabled(api.V3rpcCapability) {
  41. return nil, rpctypes.ErrGRPCNotCapable
  42. }
  43. if s.IsMemberExist(s.ID()) && s.IsLearner() && !isRPCSupportedForLearner(req) {
  44. return nil, rpctypes.ErrGPRCNotSupportedForLearner
  45. }
  46. md, ok := metadata.FromIncomingContext(ctx)
  47. if ok {
  48. if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
  49. if s.Leader() == types.ID(raft.None) {
  50. return nil, rpctypes.ErrGRPCNoLeader
  51. }
  52. }
  53. }
  54. return handler(ctx, req)
  55. }
  56. }
  57. func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
  58. return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  59. startTime := time.Now()
  60. resp, err := handler(ctx, req)
  61. lg := s.Logger()
  62. if (lg != nil && lg.Core().Enabled(zap.DebugLevel)) || // using zap logger and debug level is enabled
  63. (lg == nil && plog.LevelAt(capnslog.DEBUG)) { // or, using capnslog and debug level is enabled
  64. defer logUnaryRequestStats(ctx, lg, info, startTime, req, resp)
  65. }
  66. return resp, err
  67. }
  68. }
  69. func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
  70. duration := time.Since(startTime)
  71. remote := "No remote client info."
  72. peerInfo, ok := peer.FromContext(ctx)
  73. if ok {
  74. remote = peerInfo.Addr.String()
  75. }
  76. responseType := info.FullMethod
  77. var reqCount, respCount int64
  78. var reqSize, respSize int
  79. var reqContent string
  80. switch _resp := resp.(type) {
  81. case *pb.RangeResponse:
  82. _req, ok := req.(*pb.RangeRequest)
  83. if ok {
  84. reqCount = 0
  85. reqSize = _req.Size()
  86. reqContent = _req.String()
  87. }
  88. if _resp != nil {
  89. respCount = _resp.GetCount()
  90. respSize = _resp.Size()
  91. }
  92. case *pb.PutResponse:
  93. _req, ok := req.(*pb.PutRequest)
  94. if ok {
  95. reqCount = 1
  96. reqSize = _req.Size()
  97. reqContent = pb.NewLoggablePutRequest(_req).String()
  98. // redact value field from request content, see PR #9821
  99. }
  100. if _resp != nil {
  101. respCount = 0
  102. respSize = _resp.Size()
  103. }
  104. case *pb.DeleteRangeResponse:
  105. _req, ok := req.(*pb.DeleteRangeRequest)
  106. if ok {
  107. reqCount = 0
  108. reqSize = _req.Size()
  109. reqContent = _req.String()
  110. }
  111. if _resp != nil {
  112. respCount = _resp.GetDeleted()
  113. respSize = _resp.Size()
  114. }
  115. case *pb.TxnResponse:
  116. _req, ok := req.(*pb.TxnRequest)
  117. if ok && _resp != nil {
  118. if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
  119. reqCount = int64(len(_req.GetSuccess()))
  120. reqSize = 0
  121. for _, r := range _req.GetSuccess() {
  122. reqSize += r.Size()
  123. }
  124. } else {
  125. reqCount = int64(len(_req.GetFailure()))
  126. reqSize = 0
  127. for _, r := range _req.GetFailure() {
  128. reqSize += r.Size()
  129. }
  130. }
  131. reqContent = pb.NewLoggableTxnRequest(_req).String()
  132. // redact value field from request content, see PR #9821
  133. }
  134. if _resp != nil {
  135. respCount = 0
  136. respSize = _resp.Size()
  137. }
  138. default:
  139. reqCount = -1
  140. reqSize = -1
  141. respCount = -1
  142. respSize = -1
  143. }
  144. logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
  145. }
  146. func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
  147. reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
  148. if lg == nil {
  149. plog.Debugf("start time = %v, "+
  150. "time spent = %v, "+
  151. "remote = %s, "+
  152. "response type = %s, "+
  153. "request count = %d, "+
  154. "request size = %d, "+
  155. "response count = %d, "+
  156. "response size = %d, "+
  157. "request content = %s",
  158. startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
  159. )
  160. } else {
  161. lg.Debug("request stats",
  162. zap.Time("start time", startTime),
  163. zap.Duration("time spent", duration),
  164. zap.String("remote", remote),
  165. zap.String("response type", responseType),
  166. zap.Int64("request count", reqCount),
  167. zap.Int("request size", reqSize),
  168. zap.Int64("response count", respCount),
  169. zap.Int("response size", respSize),
  170. zap.String("request content", reqContent),
  171. )
  172. }
  173. }
  174. func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
  175. smap := monitorLeader(s)
  176. return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  177. if !api.IsCapabilityEnabled(api.V3rpcCapability) {
  178. return rpctypes.ErrGRPCNotCapable
  179. }
  180. if s.IsMemberExist(s.ID()) && s.IsLearner() { // learner does not support stream RPC
  181. return rpctypes.ErrGPRCNotSupportedForLearner
  182. }
  183. md, ok := metadata.FromIncomingContext(ss.Context())
  184. if ok {
  185. if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
  186. if s.Leader() == types.ID(raft.None) {
  187. return rpctypes.ErrGRPCNoLeader
  188. }
  189. cctx, cancel := context.WithCancel(ss.Context())
  190. ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
  191. smap.mu.Lock()
  192. smap.streams[ss] = struct{}{}
  193. smap.mu.Unlock()
  194. defer func() {
  195. smap.mu.Lock()
  196. delete(smap.streams, ss)
  197. smap.mu.Unlock()
  198. cancel()
  199. }()
  200. }
  201. }
  202. return handler(srv, ss)
  203. }
  204. }
  205. type serverStreamWithCtx struct {
  206. grpc.ServerStream
  207. ctx context.Context
  208. cancel *context.CancelFunc
  209. }
  210. func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
  211. func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
  212. smap := &streamsMap{
  213. streams: make(map[grpc.ServerStream]struct{}),
  214. }
  215. go func() {
  216. election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
  217. noLeaderCnt := 0
  218. for {
  219. select {
  220. case <-s.StopNotify():
  221. return
  222. case <-time.After(election):
  223. if s.Leader() == types.ID(raft.None) {
  224. noLeaderCnt++
  225. } else {
  226. noLeaderCnt = 0
  227. }
  228. // We are more conservative on canceling existing streams. Reconnecting streams
  229. // cost much more than just rejecting new requests. So we wait until the member
  230. // cannot find a leader for maxNoLeaderCnt election timeouts to cancel existing streams.
  231. if noLeaderCnt >= maxNoLeaderCnt {
  232. smap.mu.Lock()
  233. for ss := range smap.streams {
  234. if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
  235. (*ssWithCtx.cancel)()
  236. <-ss.Context().Done()
  237. }
  238. }
  239. smap.streams = make(map[grpc.ServerStream]struct{})
  240. smap.mu.Unlock()
  241. }
  242. }
  243. }
  244. }()
  245. return smap
  246. }