statsite.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package metrics
  2. import (
  3. "bufio"
  4. "fmt"
  5. "log"
  6. "net"
  7. "strings"
  8. "time"
  9. )
  10. const (
  11. // We force flush the statsite metrics after this period of
  12. // inactivity. Prevents stats from getting stuck in a buffer
  13. // forever.
  14. flushInterval = 100 * time.Millisecond
  15. )
  16. // StatsiteSink provides a MetricSink that can be used with a
  17. // statsite metrics server
  18. type StatsiteSink struct {
  19. addr string
  20. metricQueue chan string
  21. }
  22. // NewStatsiteSink is used to create a new StatsiteSink
  23. func NewStatsiteSink(addr string) (*StatsiteSink, error) {
  24. s := &StatsiteSink{
  25. addr: addr,
  26. metricQueue: make(chan string, 4096),
  27. }
  28. go s.flushMetrics()
  29. return s, nil
  30. }
  31. // Close is used to stop flushing to statsite
  32. func (s *StatsiteSink) Shutdown() {
  33. close(s.metricQueue)
  34. }
  35. func (s *StatsiteSink) SetGauge(key []string, val float32) {
  36. flatKey := s.flattenKey(key)
  37. s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
  38. }
  39. func (s *StatsiteSink) EmitKey(key []string, val float32) {
  40. flatKey := s.flattenKey(key)
  41. s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
  42. }
  43. func (s *StatsiteSink) IncrCounter(key []string, val float32) {
  44. flatKey := s.flattenKey(key)
  45. s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
  46. }
  47. func (s *StatsiteSink) AddSample(key []string, val float32) {
  48. flatKey := s.flattenKey(key)
  49. s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
  50. }
  51. // Flattens the key for formatting, removes spaces
  52. func (s *StatsiteSink) flattenKey(parts []string) string {
  53. joined := strings.Join(parts, ".")
  54. return strings.Map(func(r rune) rune {
  55. switch r {
  56. case ':':
  57. fallthrough
  58. case ' ':
  59. return '_'
  60. default:
  61. return r
  62. }
  63. }, joined)
  64. }
  65. // Does a non-blocking push to the metrics queue
  66. func (s *StatsiteSink) pushMetric(m string) {
  67. select {
  68. case s.metricQueue <- m:
  69. default:
  70. }
  71. }
  72. // Flushes metrics
  73. func (s *StatsiteSink) flushMetrics() {
  74. var sock net.Conn
  75. var err error
  76. var wait <-chan time.Time
  77. var buffered *bufio.Writer
  78. ticker := time.NewTicker(flushInterval)
  79. defer ticker.Stop()
  80. CONNECT:
  81. // Attempt to connect
  82. sock, err = net.Dial("tcp", s.addr)
  83. if err != nil {
  84. log.Printf("[ERR] Error connecting to statsite! Err: %s", err)
  85. goto WAIT
  86. }
  87. // Create a buffered writer
  88. buffered = bufio.NewWriter(sock)
  89. for {
  90. select {
  91. case metric, ok := <-s.metricQueue:
  92. // Get a metric from the queue
  93. if !ok {
  94. goto QUIT
  95. }
  96. // Try to send to statsite
  97. _, err := buffered.Write([]byte(metric))
  98. if err != nil {
  99. log.Printf("[ERR] Error writing to statsite! Err: %s", err)
  100. goto WAIT
  101. }
  102. case <-ticker.C:
  103. if err := buffered.Flush(); err != nil {
  104. log.Printf("[ERR] Error flushing to statsite! Err: %s", err)
  105. goto WAIT
  106. }
  107. }
  108. }
  109. WAIT:
  110. // Wait for a while
  111. wait = time.After(time.Duration(5) * time.Second)
  112. for {
  113. select {
  114. // Dequeue the messages to avoid backlog
  115. case _, ok := <-s.metricQueue:
  116. if !ok {
  117. goto QUIT
  118. }
  119. case <-wait:
  120. goto CONNECT
  121. }
  122. }
  123. QUIT:
  124. s.metricQueue = nil
  125. }