statsd.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package metrics
  2. import (
  3. "bytes"
  4. "fmt"
  5. "log"
  6. "net"
  7. "strings"
  8. "time"
  9. )
  10. const (
  11. // statsdMaxLen is the maximum size of a packet
  12. // to send to statsd
  13. statsdMaxLen = 1400
  14. )
  15. // StatsdSink provides a MetricSink that can be used
  16. // with a statsite or statsd metrics server. It uses
  17. // only UDP packets, while StatsiteSink uses TCP.
  18. type StatsdSink struct {
  19. addr string
  20. metricQueue chan string
  21. }
  22. // NewStatsdSink is used to create a new StatsdSink
  23. func NewStatsdSink(addr string) (*StatsdSink, error) {
  24. s := &StatsdSink{
  25. addr: addr,
  26. metricQueue: make(chan string, 4096),
  27. }
  28. go s.flushMetrics()
  29. return s, nil
  30. }
  31. // Close is used to stop flushing to statsd
  32. func (s *StatsdSink) Shutdown() {
  33. close(s.metricQueue)
  34. }
  35. func (s *StatsdSink) SetGauge(key []string, val float32) {
  36. flatKey := s.flattenKey(key)
  37. s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
  38. }
  39. func (s *StatsdSink) EmitKey(key []string, val float32) {
  40. flatKey := s.flattenKey(key)
  41. s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
  42. }
  43. func (s *StatsdSink) IncrCounter(key []string, val float32) {
  44. flatKey := s.flattenKey(key)
  45. s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
  46. }
  47. func (s *StatsdSink) AddSample(key []string, val float32) {
  48. flatKey := s.flattenKey(key)
  49. s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
  50. }
  51. // Flattens the key for formatting, removes spaces
  52. func (s *StatsdSink) flattenKey(parts []string) string {
  53. joined := strings.Join(parts, ".")
  54. return strings.Map(func(r rune) rune {
  55. switch r {
  56. case ':':
  57. fallthrough
  58. case ' ':
  59. return '_'
  60. default:
  61. return r
  62. }
  63. }, joined)
  64. }
  65. // Does a non-blocking push to the metrics queue
  66. func (s *StatsdSink) pushMetric(m string) {
  67. select {
  68. case s.metricQueue <- m:
  69. default:
  70. }
  71. }
  72. // Flushes metrics
  73. func (s *StatsdSink) flushMetrics() {
  74. var sock net.Conn
  75. var err error
  76. var wait <-chan time.Time
  77. ticker := time.NewTicker(flushInterval)
  78. defer ticker.Stop()
  79. CONNECT:
  80. // Create a buffer
  81. buf := bytes.NewBuffer(nil)
  82. // Attempt to connect
  83. sock, err = net.Dial("udp", s.addr)
  84. if err != nil {
  85. log.Printf("[ERR] Error connecting to statsd! Err: %s", err)
  86. goto WAIT
  87. }
  88. for {
  89. select {
  90. case metric, ok := <-s.metricQueue:
  91. // Get a metric from the queue
  92. if !ok {
  93. goto QUIT
  94. }
  95. // Check if this would overflow the packet size
  96. if len(metric)+buf.Len() > statsdMaxLen {
  97. _, err := sock.Write(buf.Bytes())
  98. buf.Reset()
  99. if err != nil {
  100. log.Printf("[ERR] Error writing to statsd! Err: %s", err)
  101. goto WAIT
  102. }
  103. }
  104. // Append to the buffer
  105. buf.WriteString(metric)
  106. case <-ticker.C:
  107. if buf.Len() == 0 {
  108. continue
  109. }
  110. _, err := sock.Write(buf.Bytes())
  111. buf.Reset()
  112. if err != nil {
  113. log.Printf("[ERR] Error flushing to statsd! Err: %s", err)
  114. goto WAIT
  115. }
  116. }
  117. }
  118. WAIT:
  119. // Wait for a while
  120. wait = time.After(time.Duration(5) * time.Second)
  121. for {
  122. select {
  123. // Dequeue the messages to avoid backlog
  124. case _, ok := <-s.metricQueue:
  125. if !ok {
  126. goto QUIT
  127. }
  128. case <-wait:
  129. goto CONNECT
  130. }
  131. }
  132. QUIT:
  133. s.metricQueue = nil
  134. }