123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- package metrics
- import (
- "bytes"
- "fmt"
- "log"
- "net"
- "strings"
- "time"
- )
- const (
- // statsdMaxLen is the maximum size of a packet
- // to send to statsd
- statsdMaxLen = 1400
- )
- // StatsdSink provides a MetricSink that can be used
- // with a statsite or statsd metrics server. It uses
- // only UDP packets, while StatsiteSink uses TCP.
- type StatsdSink struct {
- addr string
- metricQueue chan string
- }
- // NewStatsdSink is used to create a new StatsdSink
- func NewStatsdSink(addr string) (*StatsdSink, error) {
- s := &StatsdSink{
- addr: addr,
- metricQueue: make(chan string, 4096),
- }
- go s.flushMetrics()
- return s, nil
- }
- // Close is used to stop flushing to statsd
- func (s *StatsdSink) Shutdown() {
- close(s.metricQueue)
- }
- func (s *StatsdSink) SetGauge(key []string, val float32) {
- flatKey := s.flattenKey(key)
- s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
- }
- func (s *StatsdSink) EmitKey(key []string, val float32) {
- flatKey := s.flattenKey(key)
- s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
- }
- func (s *StatsdSink) IncrCounter(key []string, val float32) {
- flatKey := s.flattenKey(key)
- s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
- }
- func (s *StatsdSink) AddSample(key []string, val float32) {
- flatKey := s.flattenKey(key)
- s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
- }
- // Flattens the key for formatting, removes spaces
- func (s *StatsdSink) flattenKey(parts []string) string {
- joined := strings.Join(parts, ".")
- return strings.Map(func(r rune) rune {
- switch r {
- case ':':
- fallthrough
- case ' ':
- return '_'
- default:
- return r
- }
- }, joined)
- }
- // Does a non-blocking push to the metrics queue
- func (s *StatsdSink) pushMetric(m string) {
- select {
- case s.metricQueue <- m:
- default:
- }
- }
- // Flushes metrics
- func (s *StatsdSink) flushMetrics() {
- var sock net.Conn
- var err error
- var wait <-chan time.Time
- ticker := time.NewTicker(flushInterval)
- defer ticker.Stop()
- CONNECT:
- // Create a buffer
- buf := bytes.NewBuffer(nil)
- // Attempt to connect
- sock, err = net.Dial("udp", s.addr)
- if err != nil {
- log.Printf("[ERR] Error connecting to statsd! Err: %s", err)
- goto WAIT
- }
- for {
- select {
- case metric, ok := <-s.metricQueue:
- // Get a metric from the queue
- if !ok {
- goto QUIT
- }
- // Check if this would overflow the packet size
- if len(metric)+buf.Len() > statsdMaxLen {
- _, err := sock.Write(buf.Bytes())
- buf.Reset()
- if err != nil {
- log.Printf("[ERR] Error writing to statsd! Err: %s", err)
- goto WAIT
- }
- }
- // Append to the buffer
- buf.WriteString(metric)
- case <-ticker.C:
- if buf.Len() == 0 {
- continue
- }
- _, err := sock.Write(buf.Bytes())
- buf.Reset()
- if err != nil {
- log.Printf("[ERR] Error flushing to statsd! Err: %s", err)
- goto WAIT
- }
- }
- }
- WAIT:
- // Wait for a while
- wait = time.After(time.Duration(5) * time.Second)
- for {
- select {
- // Dequeue the messages to avoid backlog
- case _, ok := <-s.metricQueue:
- if !ok {
- goto QUIT
- }
- case <-wait:
- goto CONNECT
- }
- }
- QUIT:
- s.metricQueue = nil
- }
|