sink.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package metrics
  2. import (
  3. "fmt"
  4. "net/url"
  5. )
  6. // The MetricSink interface is used to transmit metrics information
  7. // to an external system
  8. type MetricSink interface {
  9. // A Gauge should retain the last value it is set to
  10. SetGauge(key []string, val float32)
  11. SetGaugeWithLabels(key []string, val float32, labels []Label)
  12. // Should emit a Key/Value pair for each call
  13. EmitKey(key []string, val float32)
  14. // Counters should accumulate values
  15. IncrCounter(key []string, val float32)
  16. IncrCounterWithLabels(key []string, val float32, labels []Label)
  17. // Samples are for timing information, where quantiles are used
  18. AddSample(key []string, val float32)
  19. AddSampleWithLabels(key []string, val float32, labels []Label)
  20. }
  21. type ShutdownSink interface {
  22. MetricSink
  23. // Shutdown the metric sink, flush metrics to storage, and cleanup resources.
  24. // Called immediately prior to application exit. Implementations must block
  25. // until metrics are flushed to storage.
  26. Shutdown()
  27. }
  28. // BlackholeSink is used to just blackhole messages
  29. type BlackholeSink struct{}
  30. func (*BlackholeSink) SetGauge(key []string, val float32) {}
  31. func (*BlackholeSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {}
  32. func (*BlackholeSink) EmitKey(key []string, val float32) {}
  33. func (*BlackholeSink) IncrCounter(key []string, val float32) {}
  34. func (*BlackholeSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {}
  35. func (*BlackholeSink) AddSample(key []string, val float32) {}
  36. func (*BlackholeSink) AddSampleWithLabels(key []string, val float32, labels []Label) {}
  37. // FanoutSink is used to sink to fanout values to multiple sinks
  38. type FanoutSink []MetricSink
  39. func (fh FanoutSink) SetGauge(key []string, val float32) {
  40. fh.SetGaugeWithLabels(key, val, nil)
  41. }
  42. func (fh FanoutSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
  43. for _, s := range fh {
  44. s.SetGaugeWithLabels(key, val, labels)
  45. }
  46. }
  47. func (fh FanoutSink) EmitKey(key []string, val float32) {
  48. for _, s := range fh {
  49. s.EmitKey(key, val)
  50. }
  51. }
  52. func (fh FanoutSink) IncrCounter(key []string, val float32) {
  53. fh.IncrCounterWithLabels(key, val, nil)
  54. }
  55. func (fh FanoutSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
  56. for _, s := range fh {
  57. s.IncrCounterWithLabels(key, val, labels)
  58. }
  59. }
  60. func (fh FanoutSink) AddSample(key []string, val float32) {
  61. fh.AddSampleWithLabels(key, val, nil)
  62. }
  63. func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
  64. for _, s := range fh {
  65. s.AddSampleWithLabels(key, val, labels)
  66. }
  67. }
  68. func (fh FanoutSink) Shutdown() {
  69. for _, s := range fh {
  70. if ss, ok := s.(ShutdownSink); ok {
  71. ss.Shutdown()
  72. }
  73. }
  74. }
  75. // sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided
  76. // by each sink type
  77. type sinkURLFactoryFunc func(*url.URL) (MetricSink, error)
  78. // sinkRegistry supports the generic NewMetricSink function by mapping URL
  79. // schemes to metric sink factory functions
  80. var sinkRegistry = map[string]sinkURLFactoryFunc{
  81. "statsd": NewStatsdSinkFromURL,
  82. "statsite": NewStatsiteSinkFromURL,
  83. "inmem": NewInmemSinkFromURL,
  84. }
  85. // NewMetricSinkFromURL allows a generic URL input to configure any of the
  86. // supported sinks. The scheme of the URL identifies the type of the sink, the
  87. // and query parameters are used to set options.
  88. //
  89. // "statsd://" - Initializes a StatsdSink. The host and port are passed through
  90. // as the "addr" of the sink
  91. //
  92. // "statsite://" - Initializes a StatsiteSink. The host and port become the
  93. // "addr" of the sink
  94. //
  95. // "inmem://" - Initializes an InmemSink. The host and port are ignored. The
  96. // "interval" and "duration" query parameters must be specified with valid
  97. // durations, see NewInmemSink for details.
  98. func NewMetricSinkFromURL(urlStr string) (MetricSink, error) {
  99. u, err := url.Parse(urlStr)
  100. if err != nil {
  101. return nil, err
  102. }
  103. sinkURLFactoryFunc := sinkRegistry[u.Scheme]
  104. if sinkURLFactoryFunc == nil {
  105. return nil, fmt.Errorf(
  106. "cannot create metric sink, unrecognized sink name: %q", u.Scheme)
  107. }
  108. return sinkURLFactoryFunc(u)
  109. }