statsd.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package metrics
  2. import (
  3. "bytes"
  4. "fmt"
  5. "log"
  6. "net"
  7. "net/url"
  8. "strings"
  9. "time"
  10. )
  11. const (
  12. // statsdMaxLen is the maximum size of a packet
  13. // to send to statsd
  14. statsdMaxLen = 1400
  15. )
  16. // StatsdSink provides a MetricSink that can be used
  17. // with a statsite or statsd metrics server. It uses
  18. // only UDP packets, while StatsiteSink uses TCP.
  19. type StatsdSink struct {
  20. addr string
  21. metricQueue chan string
  22. }
  23. // NewStatsdSinkFromURL creates an StatsdSink from a URL. It is used
  24. // (and tested) from NewMetricSinkFromURL.
  25. func NewStatsdSinkFromURL(u *url.URL) (MetricSink, error) {
  26. return NewStatsdSink(u.Host)
  27. }
  28. // NewStatsdSink is used to create a new StatsdSink
  29. func NewStatsdSink(addr string) (*StatsdSink, error) {
  30. s := &StatsdSink{
  31. addr: addr,
  32. metricQueue: make(chan string, 4096),
  33. }
  34. go s.flushMetrics()
  35. return s, nil
  36. }
  37. // Close is used to stop flushing to statsd
  38. func (s *StatsdSink) Shutdown() {
  39. close(s.metricQueue)
  40. }
  41. func (s *StatsdSink) SetGauge(key []string, val float32) {
  42. flatKey := s.flattenKey(key)
  43. s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
  44. }
  45. func (s *StatsdSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
  46. flatKey := s.flattenKeyLabels(key, labels)
  47. s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
  48. }
  49. func (s *StatsdSink) EmitKey(key []string, val float32) {
  50. flatKey := s.flattenKey(key)
  51. s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
  52. }
  53. func (s *StatsdSink) IncrCounter(key []string, val float32) {
  54. flatKey := s.flattenKey(key)
  55. s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
  56. }
  57. func (s *StatsdSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
  58. flatKey := s.flattenKeyLabels(key, labels)
  59. s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
  60. }
  61. func (s *StatsdSink) AddSample(key []string, val float32) {
  62. flatKey := s.flattenKey(key)
  63. s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
  64. }
  65. func (s *StatsdSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
  66. flatKey := s.flattenKeyLabels(key, labels)
  67. s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
  68. }
  69. // Flattens the key for formatting, removes spaces
  70. func (s *StatsdSink) flattenKey(parts []string) string {
  71. joined := strings.Join(parts, ".")
  72. return strings.Map(func(r rune) rune {
  73. switch r {
  74. case ':':
  75. fallthrough
  76. case ' ':
  77. return '_'
  78. default:
  79. return r
  80. }
  81. }, joined)
  82. }
  83. // Flattens the key along with labels for formatting, removes spaces
  84. func (s *StatsdSink) flattenKeyLabels(parts []string, labels []Label) string {
  85. for _, label := range labels {
  86. parts = append(parts, label.Value)
  87. }
  88. return s.flattenKey(parts)
  89. }
  90. // Does a non-blocking push to the metrics queue
  91. func (s *StatsdSink) pushMetric(m string) {
  92. select {
  93. case s.metricQueue <- m:
  94. default:
  95. }
  96. }
  97. // Flushes metrics
  98. func (s *StatsdSink) flushMetrics() {
  99. var sock net.Conn
  100. var err error
  101. var wait <-chan time.Time
  102. ticker := time.NewTicker(flushInterval)
  103. defer ticker.Stop()
  104. CONNECT:
  105. // Create a buffer
  106. buf := bytes.NewBuffer(nil)
  107. // Attempt to connect
  108. sock, err = net.Dial("udp", s.addr)
  109. if err != nil {
  110. log.Printf("[ERR] Error connecting to statsd! Err: %s", err)
  111. goto WAIT
  112. }
  113. for {
  114. select {
  115. case metric, ok := <-s.metricQueue:
  116. // Get a metric from the queue
  117. if !ok {
  118. goto QUIT
  119. }
  120. // Check if this would overflow the packet size
  121. if len(metric)+buf.Len() > statsdMaxLen {
  122. _, err := sock.Write(buf.Bytes())
  123. buf.Reset()
  124. if err != nil {
  125. log.Printf("[ERR] Error writing to statsd! Err: %s", err)
  126. goto WAIT
  127. }
  128. }
  129. // Append to the buffer
  130. buf.WriteString(metric)
  131. case <-ticker.C:
  132. if buf.Len() == 0 {
  133. continue
  134. }
  135. _, err := sock.Write(buf.Bytes())
  136. buf.Reset()
  137. if err != nil {
  138. log.Printf("[ERR] Error flushing to statsd! Err: %s", err)
  139. goto WAIT
  140. }
  141. }
  142. }
  143. WAIT:
  144. // Wait for a while
  145. wait = time.After(time.Duration(5) * time.Second)
  146. for {
  147. select {
  148. // Dequeue the messages to avoid backlog
  149. case _, ok := <-s.metricQueue:
  150. if !ok {
  151. goto QUIT
  152. }
  153. case <-wait:
  154. goto CONNECT
  155. }
  156. }
  157. QUIT:
  158. s.metricQueue = nil
  159. }