remote.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rafthttp
  15. import (
  16. "go.etcd.io/etcd/pkg/types"
  17. "go.etcd.io/etcd/raft/raftpb"
  18. "go.uber.org/zap"
  19. )
  20. type remote struct {
  21. lg *zap.Logger
  22. localID types.ID
  23. id types.ID
  24. status *peerStatus
  25. pipeline *pipeline
  26. }
  27. func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
  28. picker := newURLPicker(urls)
  29. status := newPeerStatus(tr.Logger, tr.ID, id)
  30. pipeline := &pipeline{
  31. peerID: id,
  32. tr: tr,
  33. picker: picker,
  34. status: status,
  35. raft: tr.Raft,
  36. errorc: tr.ErrorC,
  37. }
  38. pipeline.start()
  39. return &remote{
  40. lg: tr.Logger,
  41. localID: tr.ID,
  42. id: id,
  43. status: status,
  44. pipeline: pipeline,
  45. }
  46. }
  47. func (g *remote) send(m raftpb.Message) {
  48. select {
  49. case g.pipeline.msgc <- m:
  50. default:
  51. if g.status.isActive() {
  52. if g.lg != nil {
  53. g.lg.Warn(
  54. "dropped internal Raft message since sending buffer is full (overloaded network)",
  55. zap.String("message-type", m.Type.String()),
  56. zap.String("local-member-id", g.localID.String()),
  57. zap.String("from", types.ID(m.From).String()),
  58. zap.String("remote-peer-id", g.id.String()),
  59. zap.Bool("remote-peer-active", g.status.isActive()),
  60. )
  61. } else {
  62. plog.MergeWarningf("dropped internal raft message to %s since sending buffer is full (bad/overloaded network)", g.id)
  63. }
  64. } else {
  65. if g.lg != nil {
  66. g.lg.Warn(
  67. "dropped Raft message since sending buffer is full (overloaded network)",
  68. zap.String("message-type", m.Type.String()),
  69. zap.String("local-member-id", g.localID.String()),
  70. zap.String("from", types.ID(m.From).String()),
  71. zap.String("remote-peer-id", g.id.String()),
  72. zap.Bool("remote-peer-active", g.status.isActive()),
  73. )
  74. } else {
  75. plog.Debugf("dropped %s to %s since sending buffer is full", m.Type, g.id)
  76. }
  77. }
  78. sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
  79. }
  80. }
  81. func (g *remote) stop() {
  82. g.pipeline.stop()
  83. }
  84. func (g *remote) Pause() {
  85. g.stop()
  86. }
  87. func (g *remote) Resume() {
  88. g.pipeline.start()
  89. }