aggregation_data.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. // Copyright 2017, OpenCensus Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //
  15. package view
  16. import (
  17. "math"
  18. "time"
  19. "go.opencensus.io/metric/metricdata"
  20. )
  21. // AggregationData represents an aggregated value from a collection.
  22. // They are reported on the view data during exporting.
  23. // Mosts users won't directly access aggregration data.
  24. type AggregationData interface {
  25. isAggregationData() bool
  26. addSample(v float64, attachments map[string]interface{}, t time.Time)
  27. clone() AggregationData
  28. equal(other AggregationData) bool
  29. toPoint(t metricdata.Type, time time.Time) metricdata.Point
  30. }
  31. const epsilon = 1e-9
  32. // CountData is the aggregated data for the Count aggregation.
  33. // A count aggregation processes data and counts the recordings.
  34. //
  35. // Most users won't directly access count data.
  36. type CountData struct {
  37. Value int64
  38. }
  39. func (a *CountData) isAggregationData() bool { return true }
  40. func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time) {
  41. a.Value = a.Value + 1
  42. }
  43. func (a *CountData) clone() AggregationData {
  44. return &CountData{Value: a.Value}
  45. }
  46. func (a *CountData) equal(other AggregationData) bool {
  47. a2, ok := other.(*CountData)
  48. if !ok {
  49. return false
  50. }
  51. return a.Value == a2.Value
  52. }
  53. func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
  54. switch metricType {
  55. case metricdata.TypeCumulativeInt64:
  56. return metricdata.NewInt64Point(t, a.Value)
  57. default:
  58. panic("unsupported metricdata.Type")
  59. }
  60. }
  61. // SumData is the aggregated data for the Sum aggregation.
  62. // A sum aggregation processes data and sums up the recordings.
  63. //
  64. // Most users won't directly access sum data.
  65. type SumData struct {
  66. Value float64
  67. }
  68. func (a *SumData) isAggregationData() bool { return true }
  69. func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
  70. a.Value += v
  71. }
  72. func (a *SumData) clone() AggregationData {
  73. return &SumData{Value: a.Value}
  74. }
  75. func (a *SumData) equal(other AggregationData) bool {
  76. a2, ok := other.(*SumData)
  77. if !ok {
  78. return false
  79. }
  80. return math.Pow(a.Value-a2.Value, 2) < epsilon
  81. }
  82. func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
  83. switch metricType {
  84. case metricdata.TypeCumulativeInt64:
  85. return metricdata.NewInt64Point(t, int64(a.Value))
  86. case metricdata.TypeCumulativeFloat64:
  87. return metricdata.NewFloat64Point(t, a.Value)
  88. default:
  89. panic("unsupported metricdata.Type")
  90. }
  91. }
  92. // DistributionData is the aggregated data for the
  93. // Distribution aggregation.
  94. //
  95. // Most users won't directly access distribution data.
  96. //
  97. // For a distribution with N bounds, the associated DistributionData will have
  98. // N+1 buckets.
  99. type DistributionData struct {
  100. Count int64 // number of data points aggregated
  101. Min float64 // minimum value in the distribution
  102. Max float64 // max value in the distribution
  103. Mean float64 // mean of the distribution
  104. SumOfSquaredDev float64 // sum of the squared deviation from the mean
  105. CountPerBucket []int64 // number of occurrences per bucket
  106. // ExemplarsPerBucket is slice the same length as CountPerBucket containing
  107. // an exemplar for the associated bucket, or nil.
  108. ExemplarsPerBucket []*metricdata.Exemplar
  109. bounds []float64 // histogram distribution of the values
  110. }
  111. func newDistributionData(bounds []float64) *DistributionData {
  112. bucketCount := len(bounds) + 1
  113. return &DistributionData{
  114. CountPerBucket: make([]int64, bucketCount),
  115. ExemplarsPerBucket: make([]*metricdata.Exemplar, bucketCount),
  116. bounds: bounds,
  117. Min: math.MaxFloat64,
  118. Max: math.SmallestNonzeroFloat64,
  119. }
  120. }
  121. // Sum returns the sum of all samples collected.
  122. func (a *DistributionData) Sum() float64 { return a.Mean * float64(a.Count) }
  123. func (a *DistributionData) variance() float64 {
  124. if a.Count <= 1 {
  125. return 0
  126. }
  127. return a.SumOfSquaredDev / float64(a.Count-1)
  128. }
  129. func (a *DistributionData) isAggregationData() bool { return true }
  130. // TODO(songy23): support exemplar attachments.
  131. func (a *DistributionData) addSample(v float64, attachments map[string]interface{}, t time.Time) {
  132. if v < a.Min {
  133. a.Min = v
  134. }
  135. if v > a.Max {
  136. a.Max = v
  137. }
  138. a.Count++
  139. a.addToBucket(v, attachments, t)
  140. if a.Count == 1 {
  141. a.Mean = v
  142. return
  143. }
  144. oldMean := a.Mean
  145. a.Mean = a.Mean + (v-a.Mean)/float64(a.Count)
  146. a.SumOfSquaredDev = a.SumOfSquaredDev + (v-oldMean)*(v-a.Mean)
  147. }
  148. func (a *DistributionData) addToBucket(v float64, attachments map[string]interface{}, t time.Time) {
  149. var count *int64
  150. var i int
  151. var b float64
  152. for i, b = range a.bounds {
  153. if v < b {
  154. count = &a.CountPerBucket[i]
  155. break
  156. }
  157. }
  158. if count == nil { // Last bucket.
  159. i = len(a.bounds)
  160. count = &a.CountPerBucket[i]
  161. }
  162. *count++
  163. if exemplar := getExemplar(v, attachments, t); exemplar != nil {
  164. a.ExemplarsPerBucket[i] = exemplar
  165. }
  166. }
  167. func getExemplar(v float64, attachments map[string]interface{}, t time.Time) *metricdata.Exemplar {
  168. if len(attachments) == 0 {
  169. return nil
  170. }
  171. return &metricdata.Exemplar{
  172. Value: v,
  173. Timestamp: t,
  174. Attachments: attachments,
  175. }
  176. }
  177. func (a *DistributionData) clone() AggregationData {
  178. c := *a
  179. c.CountPerBucket = append([]int64(nil), a.CountPerBucket...)
  180. c.ExemplarsPerBucket = append([]*metricdata.Exemplar(nil), a.ExemplarsPerBucket...)
  181. return &c
  182. }
  183. func (a *DistributionData) equal(other AggregationData) bool {
  184. a2, ok := other.(*DistributionData)
  185. if !ok {
  186. return false
  187. }
  188. if a2 == nil {
  189. return false
  190. }
  191. if len(a.CountPerBucket) != len(a2.CountPerBucket) {
  192. return false
  193. }
  194. for i := range a.CountPerBucket {
  195. if a.CountPerBucket[i] != a2.CountPerBucket[i] {
  196. return false
  197. }
  198. }
  199. return a.Count == a2.Count && a.Min == a2.Min && a.Max == a2.Max && math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon
  200. }
  201. func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
  202. switch metricType {
  203. case metricdata.TypeCumulativeDistribution:
  204. buckets := []metricdata.Bucket{}
  205. for i := 0; i < len(a.CountPerBucket); i++ {
  206. buckets = append(buckets, metricdata.Bucket{
  207. Count: a.CountPerBucket[i],
  208. Exemplar: a.ExemplarsPerBucket[i],
  209. })
  210. }
  211. bucketOptions := &metricdata.BucketOptions{Bounds: a.bounds}
  212. val := &metricdata.Distribution{
  213. Count: a.Count,
  214. Sum: a.Sum(),
  215. SumOfSquaredDeviation: a.SumOfSquaredDev,
  216. BucketOptions: bucketOptions,
  217. Buckets: buckets,
  218. }
  219. return metricdata.NewDistributionPoint(t, val)
  220. default:
  221. // TODO: [rghetia] when we have a use case for TypeGaugeDistribution.
  222. panic("unsupported metricdata.Type")
  223. }
  224. }
  225. // LastValueData returns the last value recorded for LastValue aggregation.
  226. type LastValueData struct {
  227. Value float64
  228. }
  229. func (l *LastValueData) isAggregationData() bool {
  230. return true
  231. }
  232. func (l *LastValueData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
  233. l.Value = v
  234. }
  235. func (l *LastValueData) clone() AggregationData {
  236. return &LastValueData{l.Value}
  237. }
  238. func (l *LastValueData) equal(other AggregationData) bool {
  239. a2, ok := other.(*LastValueData)
  240. if !ok {
  241. return false
  242. }
  243. return l.Value == a2.Value
  244. }
  245. func (l *LastValueData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
  246. switch metricType {
  247. case metricdata.TypeGaugeInt64:
  248. return metricdata.NewInt64Point(t, int64(l.Value))
  249. case metricdata.TypeGaugeFloat64:
  250. return metricdata.NewFloat64Point(t, l.Value)
  251. default:
  252. panic("unsupported metricdata.Type")
  253. }
  254. }