collector.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package metrics
  2. import (
  3. "context"
  4. "strings"
  5. metrics "github.com/docker/go-metrics"
  6. "github.com/docker/swarmkit/api"
  7. "github.com/docker/swarmkit/manager/state/store"
  8. )
  9. var (
  10. ns = metrics.NewNamespace("swarm", "manager", nil)
  11. nodesMetric metrics.LabeledGauge
  12. )
  13. func init() {
  14. nodesMetric = ns.NewLabeledGauge("nodes", "The number of nodes", "", "state")
  15. for _, state := range api.NodeStatus_State_name {
  16. nodesMetric.WithValues(strings.ToLower(state)).Set(0)
  17. }
  18. metrics.Register(ns)
  19. }
  20. // Collector collects swarmkit metrics
  21. type Collector struct {
  22. store *store.MemoryStore
  23. // stopChan signals to the state machine to stop running.
  24. stopChan chan struct{}
  25. // doneChan is closed when the state machine terminates.
  26. doneChan chan struct{}
  27. }
  28. // NewCollector creates a new metrics collector
  29. func NewCollector(store *store.MemoryStore) *Collector {
  30. return &Collector{
  31. store: store,
  32. stopChan: make(chan struct{}),
  33. doneChan: make(chan struct{}),
  34. }
  35. }
  36. func (c *Collector) updateNodeState(prevNode, newNode *api.Node) {
  37. // Skip updates if nothing changed.
  38. if prevNode != nil && newNode != nil && prevNode.Status.State == newNode.Status.State {
  39. return
  40. }
  41. if prevNode != nil {
  42. nodesMetric.WithValues(strings.ToLower(prevNode.Status.State.String())).Dec(1)
  43. }
  44. if newNode != nil {
  45. nodesMetric.WithValues(strings.ToLower(newNode.Status.State.String())).Inc(1)
  46. }
  47. }
  48. // Run contains the collector event loop
  49. func (c *Collector) Run(ctx context.Context) error {
  50. defer close(c.doneChan)
  51. watcher, cancel, err := store.ViewAndWatch(c.store, func(readTx store.ReadTx) error {
  52. nodes, err := store.FindNodes(readTx, store.All)
  53. if err != nil {
  54. return err
  55. }
  56. for _, node := range nodes {
  57. c.updateNodeState(nil, node)
  58. }
  59. return nil
  60. })
  61. if err != nil {
  62. return err
  63. }
  64. defer cancel()
  65. for {
  66. select {
  67. case event := <-watcher:
  68. switch v := event.(type) {
  69. case api.EventCreateNode:
  70. c.updateNodeState(nil, v.Node)
  71. case api.EventUpdateNode:
  72. c.updateNodeState(v.OldNode, v.Node)
  73. case api.EventDeleteNode:
  74. c.updateNodeState(v.Node, nil)
  75. }
  76. case <-c.stopChan:
  77. return nil
  78. }
  79. }
  80. }
  81. // Stop stops the collector.
  82. func (c *Collector) Stop() {
  83. close(c.stopChan)
  84. <-c.doneChan
  85. // Clean the metrics on exit.
  86. for _, state := range api.NodeStatus_State_name {
  87. nodesMetric.WithValues(strings.ToLower(state)).Set(0)
  88. }
  89. }