inmem_signal.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package metrics
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "os"
  7. "os/signal"
  8. "sync"
  9. "syscall"
  10. )
  11. // InmemSignal is used to listen for a given signal, and when received,
  12. // to dump the current metrics from the InmemSink to an io.Writer
  13. type InmemSignal struct {
  14. signal syscall.Signal
  15. inm *InmemSink
  16. w io.Writer
  17. sigCh chan os.Signal
  18. stop bool
  19. stopCh chan struct{}
  20. stopLock sync.Mutex
  21. }
  22. // NewInmemSignal creates a new InmemSignal which listens for a given signal,
  23. // and dumps the current metrics out to a writer
  24. func NewInmemSignal(inmem *InmemSink, sig syscall.Signal, w io.Writer) *InmemSignal {
  25. i := &InmemSignal{
  26. signal: sig,
  27. inm: inmem,
  28. w: w,
  29. sigCh: make(chan os.Signal, 1),
  30. stopCh: make(chan struct{}),
  31. }
  32. signal.Notify(i.sigCh, sig)
  33. go i.run()
  34. return i
  35. }
  36. // DefaultInmemSignal returns a new InmemSignal that responds to SIGUSR1
  37. // and writes output to stderr. Windows uses SIGBREAK
  38. func DefaultInmemSignal(inmem *InmemSink) *InmemSignal {
  39. return NewInmemSignal(inmem, DefaultSignal, os.Stderr)
  40. }
  41. // Stop is used to stop the InmemSignal from listening
  42. func (i *InmemSignal) Stop() {
  43. i.stopLock.Lock()
  44. defer i.stopLock.Unlock()
  45. if i.stop {
  46. return
  47. }
  48. i.stop = true
  49. close(i.stopCh)
  50. signal.Stop(i.sigCh)
  51. }
  52. // run is a long running routine that handles signals
  53. func (i *InmemSignal) run() {
  54. for {
  55. select {
  56. case <-i.sigCh:
  57. i.dumpStats()
  58. case <-i.stopCh:
  59. return
  60. }
  61. }
  62. }
  63. // dumpStats is used to dump the data to output writer
  64. func (i *InmemSignal) dumpStats() {
  65. buf := bytes.NewBuffer(nil)
  66. data := i.inm.Data()
  67. // Skip the last period which is still being aggregated
  68. for i := 0; i < len(data)-1; i++ {
  69. intv := data[i]
  70. intv.RLock()
  71. for name, val := range intv.Gauges {
  72. fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val)
  73. }
  74. for name, vals := range intv.Points {
  75. for _, val := range vals {
  76. fmt.Fprintf(buf, "[%v][P] '%s': %0.3f\n", intv.Interval, name, val)
  77. }
  78. }
  79. for name, agg := range intv.Counters {
  80. fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg)
  81. }
  82. for name, agg := range intv.Samples {
  83. fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg)
  84. }
  85. intv.RUnlock()
  86. }
  87. // Write out the bytes
  88. i.w.Write(buf.Bytes())
  89. }