inmem_endpoint.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package metrics
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "sort"
  7. "time"
  8. )
  9. // MetricsSummary holds a roll-up of metrics info for a given interval
  10. type MetricsSummary struct {
  11. Timestamp string
  12. Gauges []GaugeValue
  13. Points []PointValue
  14. Counters []SampledValue
  15. Samples []SampledValue
  16. }
  17. type GaugeValue struct {
  18. Name string
  19. Hash string `json:"-"`
  20. Value float32
  21. Labels []Label `json:"-"`
  22. DisplayLabels map[string]string `json:"Labels"`
  23. }
  24. type PointValue struct {
  25. Name string
  26. Points []float32
  27. }
  28. type SampledValue struct {
  29. Name string
  30. Hash string `json:"-"`
  31. *AggregateSample
  32. Mean float64
  33. Stddev float64
  34. Labels []Label `json:"-"`
  35. DisplayLabels map[string]string `json:"Labels"`
  36. }
  37. // deepCopy allocates a new instance of AggregateSample
  38. func (source *SampledValue) deepCopy() SampledValue {
  39. dest := *source
  40. if source.AggregateSample != nil {
  41. dest.AggregateSample = &AggregateSample{}
  42. *dest.AggregateSample = *source.AggregateSample
  43. }
  44. return dest
  45. }
  46. // DisplayMetrics returns a summary of the metrics from the most recent finished interval.
  47. func (i *InmemSink) DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
  48. data := i.Data()
  49. var interval *IntervalMetrics
  50. n := len(data)
  51. switch {
  52. case n == 0:
  53. return nil, fmt.Errorf("no metric intervals have been initialized yet")
  54. case n == 1:
  55. // Show the current interval if it's all we have
  56. interval = data[0]
  57. default:
  58. // Show the most recent finished interval if we have one
  59. interval = data[n-2]
  60. }
  61. return newMetricSummaryFromInterval(interval), nil
  62. }
  63. func newMetricSummaryFromInterval(interval *IntervalMetrics) MetricsSummary {
  64. interval.RLock()
  65. defer interval.RUnlock()
  66. summary := MetricsSummary{
  67. Timestamp: interval.Interval.Round(time.Second).UTC().String(),
  68. Gauges: make([]GaugeValue, 0, len(interval.Gauges)),
  69. Points: make([]PointValue, 0, len(interval.Points)),
  70. }
  71. // Format and sort the output of each metric type, so it gets displayed in a
  72. // deterministic order.
  73. for name, points := range interval.Points {
  74. summary.Points = append(summary.Points, PointValue{name, points})
  75. }
  76. sort.Slice(summary.Points, func(i, j int) bool {
  77. return summary.Points[i].Name < summary.Points[j].Name
  78. })
  79. for hash, value := range interval.Gauges {
  80. value.Hash = hash
  81. value.DisplayLabels = make(map[string]string)
  82. for _, label := range value.Labels {
  83. value.DisplayLabels[label.Name] = label.Value
  84. }
  85. value.Labels = nil
  86. summary.Gauges = append(summary.Gauges, value)
  87. }
  88. sort.Slice(summary.Gauges, func(i, j int) bool {
  89. return summary.Gauges[i].Hash < summary.Gauges[j].Hash
  90. })
  91. summary.Counters = formatSamples(interval.Counters)
  92. summary.Samples = formatSamples(interval.Samples)
  93. return summary
  94. }
  95. func formatSamples(source map[string]SampledValue) []SampledValue {
  96. output := make([]SampledValue, 0, len(source))
  97. for hash, sample := range source {
  98. displayLabels := make(map[string]string)
  99. for _, label := range sample.Labels {
  100. displayLabels[label.Name] = label.Value
  101. }
  102. output = append(output, SampledValue{
  103. Name: sample.Name,
  104. Hash: hash,
  105. AggregateSample: sample.AggregateSample,
  106. Mean: sample.AggregateSample.Mean(),
  107. Stddev: sample.AggregateSample.Stddev(),
  108. DisplayLabels: displayLabels,
  109. })
  110. }
  111. sort.Slice(output, func(i, j int) bool {
  112. return output[i].Hash < output[j].Hash
  113. })
  114. return output
  115. }
  116. type Encoder interface {
  117. Encode(interface{}) error
  118. }
  119. // Stream writes metrics using encoder.Encode each time an interval ends. Runs
  120. // until the request context is cancelled, or the encoder returns an error.
  121. // The caller is responsible for logging any errors from encoder.
  122. func (i *InmemSink) Stream(ctx context.Context, encoder Encoder) {
  123. interval := i.getInterval()
  124. for {
  125. select {
  126. case <-interval.done:
  127. summary := newMetricSummaryFromInterval(interval)
  128. if err := encoder.Encode(summary); err != nil {
  129. return
  130. }
  131. // update interval to the next one
  132. interval = i.getInterval()
  133. case <-ctx.Done():
  134. return
  135. }
  136. }
  137. }