inmem.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package metrics
  2. import (
  3. "fmt"
  4. "math"
  5. "strings"
  6. "sync"
  7. "time"
  8. )
  9. // InmemSink provides a MetricSink that does in-memory aggregation
  10. // without sending metrics over a network. It can be embedded within
  11. // an application to provide profiling information.
  12. type InmemSink struct {
  13. // How long is each aggregation interval
  14. interval time.Duration
  15. // Retain controls how many metrics interval we keep
  16. retain time.Duration
  17. // maxIntervals is the maximum length of intervals.
  18. // It is retain / interval.
  19. maxIntervals int
  20. // intervals is a slice of the retained intervals
  21. intervals []*IntervalMetrics
  22. intervalLock sync.RWMutex
  23. }
  24. // IntervalMetrics stores the aggregated metrics
  25. // for a specific interval
  26. type IntervalMetrics struct {
  27. sync.RWMutex
  28. // The start time of the interval
  29. Interval time.Time
  30. // Gauges maps the key to the last set value
  31. Gauges map[string]float32
  32. // Points maps the string to the list of emitted values
  33. // from EmitKey
  34. Points map[string][]float32
  35. // Counters maps the string key to a sum of the counter
  36. // values
  37. Counters map[string]*AggregateSample
  38. // Samples maps the key to an AggregateSample,
  39. // which has the rolled up view of a sample
  40. Samples map[string]*AggregateSample
  41. }
  42. // NewIntervalMetrics creates a new IntervalMetrics for a given interval
  43. func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
  44. return &IntervalMetrics{
  45. Interval: intv,
  46. Gauges: make(map[string]float32),
  47. Points: make(map[string][]float32),
  48. Counters: make(map[string]*AggregateSample),
  49. Samples: make(map[string]*AggregateSample),
  50. }
  51. }
  52. // AggregateSample is used to hold aggregate metrics
  53. // about a sample
  54. type AggregateSample struct {
  55. Count int // The count of emitted pairs
  56. Sum float64 // The sum of values
  57. SumSq float64 // The sum of squared values
  58. Min float64 // Minimum value
  59. Max float64 // Maximum value
  60. }
  61. // Computes a Stddev of the values
  62. func (a *AggregateSample) Stddev() float64 {
  63. num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2)
  64. div := float64(a.Count * (a.Count - 1))
  65. if div == 0 {
  66. return 0
  67. }
  68. return math.Sqrt(num / div)
  69. }
  70. // Computes a mean of the values
  71. func (a *AggregateSample) Mean() float64 {
  72. if a.Count == 0 {
  73. return 0
  74. }
  75. return a.Sum / float64(a.Count)
  76. }
  77. // Ingest is used to update a sample
  78. func (a *AggregateSample) Ingest(v float64) {
  79. a.Count++
  80. a.Sum += v
  81. a.SumSq += (v * v)
  82. if v < a.Min || a.Count == 1 {
  83. a.Min = v
  84. }
  85. if v > a.Max || a.Count == 1 {
  86. a.Max = v
  87. }
  88. }
  89. func (a *AggregateSample) String() string {
  90. if a.Count == 0 {
  91. return "Count: 0"
  92. } else if a.Stddev() == 0 {
  93. return fmt.Sprintf("Count: %d Sum: %0.3f", a.Count, a.Sum)
  94. } else {
  95. return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f",
  96. a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum)
  97. }
  98. }
  99. // NewInmemSink is used to construct a new in-memory sink.
  100. // Uses an aggregation interval and maximum retention period.
  101. func NewInmemSink(interval, retain time.Duration) *InmemSink {
  102. i := &InmemSink{
  103. interval: interval,
  104. retain: retain,
  105. maxIntervals: int(retain / interval),
  106. }
  107. i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals)
  108. return i
  109. }
  110. func (i *InmemSink) SetGauge(key []string, val float32) {
  111. k := i.flattenKey(key)
  112. intv := i.getInterval()
  113. intv.Lock()
  114. defer intv.Unlock()
  115. intv.Gauges[k] = val
  116. }
  117. func (i *InmemSink) EmitKey(key []string, val float32) {
  118. k := i.flattenKey(key)
  119. intv := i.getInterval()
  120. intv.Lock()
  121. defer intv.Unlock()
  122. vals := intv.Points[k]
  123. intv.Points[k] = append(vals, val)
  124. }
  125. func (i *InmemSink) IncrCounter(key []string, val float32) {
  126. k := i.flattenKey(key)
  127. intv := i.getInterval()
  128. intv.Lock()
  129. defer intv.Unlock()
  130. agg := intv.Counters[k]
  131. if agg == nil {
  132. agg = &AggregateSample{}
  133. intv.Counters[k] = agg
  134. }
  135. agg.Ingest(float64(val))
  136. }
  137. func (i *InmemSink) AddSample(key []string, val float32) {
  138. k := i.flattenKey(key)
  139. intv := i.getInterval()
  140. intv.Lock()
  141. defer intv.Unlock()
  142. agg := intv.Samples[k]
  143. if agg == nil {
  144. agg = &AggregateSample{}
  145. intv.Samples[k] = agg
  146. }
  147. agg.Ingest(float64(val))
  148. }
  149. // Data is used to retrieve all the aggregated metrics
  150. // Intervals may be in use, and a read lock should be acquired
  151. func (i *InmemSink) Data() []*IntervalMetrics {
  152. // Get the current interval, forces creation
  153. i.getInterval()
  154. i.intervalLock.RLock()
  155. defer i.intervalLock.RUnlock()
  156. intervals := make([]*IntervalMetrics, len(i.intervals))
  157. copy(intervals, i.intervals)
  158. return intervals
  159. }
  160. func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics {
  161. i.intervalLock.RLock()
  162. defer i.intervalLock.RUnlock()
  163. n := len(i.intervals)
  164. if n > 0 && i.intervals[n-1].Interval == intv {
  165. return i.intervals[n-1]
  166. }
  167. return nil
  168. }
  169. func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics {
  170. i.intervalLock.Lock()
  171. defer i.intervalLock.Unlock()
  172. // Check for an existing interval
  173. n := len(i.intervals)
  174. if n > 0 && i.intervals[n-1].Interval == intv {
  175. return i.intervals[n-1]
  176. }
  177. // Add the current interval
  178. current := NewIntervalMetrics(intv)
  179. i.intervals = append(i.intervals, current)
  180. n++
  181. // Truncate the intervals if they are too long
  182. if n >= i.maxIntervals {
  183. copy(i.intervals[0:], i.intervals[n-i.maxIntervals:])
  184. i.intervals = i.intervals[:i.maxIntervals]
  185. }
  186. return current
  187. }
  188. // getInterval returns the current interval to write to
  189. func (i *InmemSink) getInterval() *IntervalMetrics {
  190. intv := time.Now().Truncate(i.interval)
  191. if m := i.getExistingInterval(intv); m != nil {
  192. return m
  193. }
  194. return i.createInterval(intv)
  195. }
  196. // Flattens the key for formatting, removes spaces
  197. func (i *InmemSink) flattenKey(parts []string) string {
  198. joined := strings.Join(parts, ".")
  199. return strings.Replace(joined, " ", "_", -1)
  200. }