etcd-version-monitor.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package main
  14. import (
  15. "bytes"
  16. "encoding/json"
  17. "errors"
  18. goflag "flag"
  19. "fmt"
  20. "net/http"
  21. "time"
  22. "github.com/gogo/protobuf/proto"
  23. dto "github.com/prometheus/client_model/go"
  24. "github.com/prometheus/common/expfmt"
  25. "github.com/spf13/pflag"
  26. "k8s.io/component-base/metrics"
  27. "k8s.io/klog"
  28. )
  29. // Initialize the prometheus instrumentation and client related flags.
  30. var (
  31. listenAddress string
  32. metricsPath string
  33. etcdVersionScrapeURI string
  34. etcdMetricsScrapeURI string
  35. scrapeTimeout time.Duration
  36. )
  37. func registerFlags(fs *pflag.FlagSet) {
  38. fs.StringVar(&listenAddress, "listen-address", "localhost:9101", "Address to listen on for serving prometheus metrics")
  39. fs.StringVar(&metricsPath, "metrics-path", "/metrics", "Path under which prometheus metrics are to be served")
  40. fs.StringVar(&etcdVersionScrapeURI, "etcd-version-scrape-uri", "http://localhost:2379/version", "URI to scrape etcd version info")
  41. fs.StringVar(&etcdMetricsScrapeURI, "etcd-metrics-scrape-uri", "http://localhost:2379/metrics", "URI to scrape etcd metrics")
  42. fs.DurationVar(&scrapeTimeout, "scrape-timeout", 15*time.Second, "Timeout for trying to get stats from etcd")
  43. }
  44. const (
  45. namespace = "etcd" // For prefixing prometheus metrics
  46. )
  47. // Initialize prometheus metrics to be exported.
  48. var (
  49. // Register all custom metrics with a dedicated registry to keep them separate.
  50. customMetricRegistry = metrics.NewKubeRegistry()
  51. // Custom etcd version metric since etcd 3.2- does not export one.
  52. // This will be replaced by https://github.com/coreos/etcd/pull/8960 in etcd 3.3.
  53. etcdVersion = metrics.NewGaugeVec(
  54. &metrics.GaugeOpts{
  55. Namespace: namespace,
  56. Name: "version_info",
  57. Help: "Etcd server's binary version",
  58. StabilityLevel: metrics.ALPHA,
  59. },
  60. []string{"binary_version"})
  61. gatherer = &monitorGatherer{
  62. // Rewrite rules for etcd metrics that are exported by default.
  63. exported: map[string]*exportedMetric{
  64. // etcd 3.0 metric format for total grpc requests with renamed method and service labels.
  65. "etcd_grpc_requests_total": {
  66. rewriters: []rewriteFunc{
  67. func(mf *dto.MetricFamily) (*dto.MetricFamily, error) {
  68. mf = deepCopyMetricFamily(mf)
  69. renameLabels(mf, map[string]string{
  70. "grpc_method": "method",
  71. "grpc_service": "service",
  72. })
  73. return mf, nil
  74. },
  75. },
  76. },
  77. // etcd 3.1+ metric format for total grpc requests.
  78. "grpc_server_handled_total": {
  79. rewriters: []rewriteFunc{
  80. // Export the metric exactly as-is. For 3.1+ metrics, we will
  81. // pass all metrics directly through.
  82. identity,
  83. // Write to the etcd 3.0 metric format for backward compatibility.
  84. func(mf *dto.MetricFamily) (*dto.MetricFamily, error) {
  85. mf = deepCopyMetricFamily(mf)
  86. renameMetric(mf, "etcd_grpc_requests_total")
  87. renameLabels(mf, map[string]string{
  88. "grpc_method": "method",
  89. "grpc_service": "service",
  90. })
  91. filterMetricsByLabels(mf, map[string]string{
  92. "grpc_type": "unary",
  93. })
  94. groupCounterMetricsByLabels(mf, map[string]bool{
  95. "grpc_type": true,
  96. "grpc_code": true,
  97. })
  98. return mf, nil
  99. },
  100. },
  101. },
  102. // etcd 3.0 metric format for grpc request latencies,
  103. // rewritten to the etcd 3.1+ format.
  104. "etcd_grpc_unary_requests_duration_seconds": {
  105. rewriters: []rewriteFunc{
  106. func(mf *dto.MetricFamily) (*dto.MetricFamily, error) {
  107. mf = deepCopyMetricFamily(mf)
  108. renameMetric(mf, "grpc_server_handling_seconds")
  109. tpeName := "grpc_type"
  110. tpeVal := "unary"
  111. for _, m := range mf.Metric {
  112. m.Label = append(m.Label, &dto.LabelPair{Name: &tpeName, Value: &tpeVal})
  113. }
  114. return mf, nil
  115. },
  116. },
  117. },
  118. // etcd 3.1+ metric format for total grpc requests.
  119. "grpc_server_handling_seconds": {},
  120. },
  121. }
  122. )
  123. // monitorGatherer is a custom metric gatherer for prometheus that exports custom metrics
  124. // defined by this monitor as well as rewritten etcd metrics.
  125. type monitorGatherer struct {
  126. exported map[string]*exportedMetric
  127. }
  128. // exportedMetric identifies a metric that is exported and defines how it is rewritten before
  129. // it is exported.
  130. type exportedMetric struct {
  131. rewriters []rewriteFunc
  132. }
  133. // rewriteFunc rewrites metrics before they are exported.
  134. type rewriteFunc func(mf *dto.MetricFamily) (*dto.MetricFamily, error)
  135. func (m *monitorGatherer) Gather() ([]*dto.MetricFamily, error) {
  136. etcdMetrics, err := scrapeMetrics()
  137. if err != nil {
  138. return nil, err
  139. }
  140. exported, err := m.rewriteExportedMetrics(etcdMetrics)
  141. if err != nil {
  142. return nil, err
  143. }
  144. custom, err := customMetricRegistry.Gather()
  145. if err != nil {
  146. return nil, err
  147. }
  148. result := make([]*dto.MetricFamily, 0, len(exported)+len(custom))
  149. result = append(result, exported...)
  150. result = append(result, custom...)
  151. return result, nil
  152. }
  153. func (m *monitorGatherer) rewriteExportedMetrics(metrics map[string]*dto.MetricFamily) ([]*dto.MetricFamily, error) {
  154. results := make([]*dto.MetricFamily, 0, len(metrics))
  155. for n, mf := range metrics {
  156. if e, ok := m.exported[n]; ok {
  157. // Apply rewrite rules for metrics that have them.
  158. if e.rewriters == nil {
  159. results = append(results, mf)
  160. } else {
  161. for _, rewriter := range e.rewriters {
  162. new, err := rewriter(mf)
  163. if err != nil {
  164. return nil, err
  165. }
  166. results = append(results, new)
  167. }
  168. }
  169. } else {
  170. // Proxy all metrics without any rewrite rules directly.
  171. results = append(results, mf)
  172. }
  173. }
  174. return results, nil
  175. }
  176. // EtcdVersion struct for unmarshalling the json response from etcd's /version endpoint.
  177. type EtcdVersion struct {
  178. BinaryVersion string `json:"etcdserver"`
  179. ClusterVersion string `json:"etcdcluster"`
  180. }
  181. // Function for fetching etcd version info and feeding it to the prometheus metric.
  182. func getVersion(lastSeenBinaryVersion *string) error {
  183. // Create the get request for the etcd version endpoint.
  184. req, err := http.NewRequest("GET", etcdVersionScrapeURI, nil)
  185. if err != nil {
  186. return fmt.Errorf("failed to create GET request for etcd version: %v", err)
  187. }
  188. // Send the get request and receive a response.
  189. client := &http.Client{}
  190. resp, err := client.Do(req)
  191. if err != nil {
  192. return fmt.Errorf("failed to receive GET response for etcd version: %v", err)
  193. }
  194. defer resp.Body.Close()
  195. // Obtain EtcdVersion from the JSON response.
  196. var version EtcdVersion
  197. if err := json.NewDecoder(resp.Body).Decode(&version); err != nil {
  198. return fmt.Errorf("failed to decode etcd version JSON: %v", err)
  199. }
  200. // Return without updating the version if it stayed the same since last time.
  201. if *lastSeenBinaryVersion == version.BinaryVersion {
  202. return nil
  203. }
  204. // Delete the metric for the previous version.
  205. if *lastSeenBinaryVersion != "" {
  206. deleted := etcdVersion.Delete(metrics.Labels{"binary_version": *lastSeenBinaryVersion})
  207. if !deleted {
  208. return errors.New("failed to delete previous version's metric")
  209. }
  210. }
  211. // Record the new version in a metric.
  212. etcdVersion.With(metrics.Labels{
  213. "binary_version": version.BinaryVersion,
  214. }).Set(0)
  215. *lastSeenBinaryVersion = version.BinaryVersion
  216. return nil
  217. }
  218. // Periodically fetches etcd version info.
  219. func getVersionPeriodically(stopCh <-chan struct{}) {
  220. lastSeenBinaryVersion := ""
  221. for {
  222. if err := getVersion(&lastSeenBinaryVersion); err != nil {
  223. klog.Errorf("Failed to fetch etcd version: %v", err)
  224. }
  225. select {
  226. case <-stopCh:
  227. case <-time.After(scrapeTimeout):
  228. }
  229. }
  230. }
  231. // scrapeMetrics scrapes the prometheus metrics from the etcd metrics URI.
  232. func scrapeMetrics() (map[string]*dto.MetricFamily, error) {
  233. req, err := http.NewRequest("GET", etcdMetricsScrapeURI, nil)
  234. if err != nil {
  235. return nil, fmt.Errorf("failed to create GET request for etcd metrics: %v", err)
  236. }
  237. // Send the get request and receive a response.
  238. client := &http.Client{}
  239. resp, err := client.Do(req)
  240. if err != nil {
  241. return nil, fmt.Errorf("failed to receive GET response for etcd metrics: %v", err)
  242. }
  243. defer resp.Body.Close()
  244. // Parse the metrics in text format to a MetricFamily struct.
  245. var textParser expfmt.TextParser
  246. return textParser.TextToMetricFamilies(resp.Body)
  247. }
  248. func renameMetric(mf *dto.MetricFamily, name string) {
  249. mf.Name = &name
  250. }
  251. func renameLabels(mf *dto.MetricFamily, nameMapping map[string]string) {
  252. for _, m := range mf.Metric {
  253. for _, lbl := range m.Label {
  254. if alias, ok := nameMapping[*lbl.Name]; ok {
  255. lbl.Name = &alias
  256. }
  257. }
  258. }
  259. }
  260. func filterMetricsByLabels(mf *dto.MetricFamily, labelValues map[string]string) {
  261. buf := mf.Metric[:0]
  262. for _, m := range mf.Metric {
  263. shouldRemove := false
  264. for _, lbl := range m.Label {
  265. if val, ok := labelValues[*lbl.Name]; ok && val != *lbl.Value {
  266. shouldRemove = true
  267. break
  268. }
  269. }
  270. if !shouldRemove {
  271. buf = append(buf, m)
  272. }
  273. }
  274. mf.Metric = buf
  275. }
  276. func groupCounterMetricsByLabels(mf *dto.MetricFamily, names map[string]bool) {
  277. buf := mf.Metric[:0]
  278. deleteLabels(mf, names)
  279. byLabels := map[string]*dto.Metric{}
  280. for _, m := range mf.Metric {
  281. if metric, ok := byLabels[labelsKey(m.Label)]; ok {
  282. metric.Counter.Value = proto.Float64(*metric.Counter.Value + *m.Counter.Value)
  283. } else {
  284. byLabels[labelsKey(m.Label)] = m
  285. buf = append(buf, m)
  286. }
  287. }
  288. mf.Metric = buf
  289. }
  290. func labelsKey(lbls []*dto.LabelPair) string {
  291. var buf bytes.Buffer
  292. for i, lbl := range lbls {
  293. buf.WriteString(lbl.String())
  294. if i < len(lbls)-1 {
  295. buf.WriteString(",")
  296. }
  297. }
  298. return buf.String()
  299. }
  300. func deleteLabels(mf *dto.MetricFamily, names map[string]bool) {
  301. for _, m := range mf.Metric {
  302. buf := m.Label[:0]
  303. for _, lbl := range m.Label {
  304. shouldRemove := names[*lbl.Name]
  305. if !shouldRemove {
  306. buf = append(buf, lbl)
  307. }
  308. }
  309. m.Label = buf
  310. }
  311. }
  312. func identity(mf *dto.MetricFamily) (*dto.MetricFamily, error) {
  313. return mf, nil
  314. }
  315. func deepCopyMetricFamily(mf *dto.MetricFamily) *dto.MetricFamily {
  316. r := &dto.MetricFamily{}
  317. r.Name = mf.Name
  318. r.Help = mf.Help
  319. r.Type = mf.Type
  320. r.Metric = make([]*dto.Metric, len(mf.Metric))
  321. for i, m := range mf.Metric {
  322. r.Metric[i] = deepCopyMetric(m)
  323. }
  324. return r
  325. }
  326. func deepCopyMetric(m *dto.Metric) *dto.Metric {
  327. r := &dto.Metric{}
  328. r.Label = make([]*dto.LabelPair, len(m.Label))
  329. for i, lp := range m.Label {
  330. r.Label[i] = deepCopyLabelPair(lp)
  331. }
  332. r.Gauge = m.Gauge
  333. r.Counter = m.Counter
  334. r.Summary = m.Summary
  335. r.Untyped = m.Untyped
  336. r.Histogram = m.Histogram
  337. r.TimestampMs = m.TimestampMs
  338. return r
  339. }
  340. func deepCopyLabelPair(lp *dto.LabelPair) *dto.LabelPair {
  341. r := &dto.LabelPair{}
  342. r.Name = lp.Name
  343. r.Value = lp.Value
  344. return r
  345. }
  346. func main() {
  347. // Register the commandline flags passed to the tool.
  348. registerFlags(pflag.CommandLine)
  349. pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
  350. pflag.Parse()
  351. // Register the metrics we defined above with prometheus.
  352. customMetricRegistry.MustRegister(etcdVersion)
  353. // Spawn threads for periodically scraping etcd version metrics.
  354. stopCh := make(chan struct{})
  355. defer close(stopCh)
  356. go getVersionPeriodically(stopCh)
  357. // Serve our metrics on listenAddress/metricsPath.
  358. klog.Infof("Listening on: %v", listenAddress)
  359. http.Handle(metricsPath, metrics.HandlerFor(gatherer, metrics.HandlerOpts{}))
  360. klog.Errorf("Stopped listening/serving metrics: %v", http.ListenAndServe(listenAddress, nil))
  361. }