collector.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package stats // import "github.com/docker/docker/daemon/stats"
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/docker/docker/api/types"
  6. "github.com/docker/docker/container"
  7. "github.com/moby/pubsub"
  8. )
  9. // Collector manages and provides container resource stats
  10. type Collector struct {
  11. m sync.Mutex
  12. cond *sync.Cond
  13. supervisor supervisor
  14. interval time.Duration
  15. publishers map[*container.Container]*pubsub.Publisher
  16. }
  17. // NewCollector creates a stats collector that will poll the supervisor with the specified interval
  18. func NewCollector(supervisor supervisor, interval time.Duration) *Collector {
  19. s := &Collector{
  20. interval: interval,
  21. supervisor: supervisor,
  22. publishers: make(map[*container.Container]*pubsub.Publisher),
  23. }
  24. s.cond = sync.NewCond(&s.m)
  25. return s
  26. }
  27. type supervisor interface {
  28. // GetContainerStats collects all the stats related to a container
  29. GetContainerStats(container *container.Container) (*types.StatsJSON, error)
  30. }
  31. // Collect registers the container with the collector and adds it to
  32. // the event loop for collection on the specified interval returning
  33. // a channel for the subscriber to receive on.
  34. func (s *Collector) Collect(c *container.Container) chan interface{} {
  35. s.cond.L.Lock()
  36. defer s.cond.L.Unlock()
  37. publisher, exists := s.publishers[c]
  38. if !exists {
  39. publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
  40. s.publishers[c] = publisher
  41. }
  42. s.cond.Broadcast()
  43. return publisher.Subscribe()
  44. }
  45. // StopCollection closes the channels for all subscribers and removes
  46. // the container from metrics collection.
  47. func (s *Collector) StopCollection(c *container.Container) {
  48. s.m.Lock()
  49. if publisher, exists := s.publishers[c]; exists {
  50. publisher.Close()
  51. delete(s.publishers, c)
  52. }
  53. s.m.Unlock()
  54. }
  55. // Unsubscribe removes a specific subscriber from receiving updates for a container's stats.
  56. func (s *Collector) Unsubscribe(c *container.Container, ch chan interface{}) {
  57. s.m.Lock()
  58. publisher := s.publishers[c]
  59. if publisher != nil {
  60. publisher.Evict(ch)
  61. if publisher.Len() == 0 {
  62. delete(s.publishers, c)
  63. }
  64. }
  65. s.m.Unlock()
  66. }
  67. // Run starts the collectors and will indefinitely collect stats from the supervisor
  68. func (s *Collector) Run() {
  69. type publishersPair struct {
  70. container *container.Container
  71. publisher *pubsub.Publisher
  72. }
  73. // we cannot determine the capacity here.
  74. // it will grow enough in first iteration
  75. var pairs []publishersPair
  76. for {
  77. s.cond.L.Lock()
  78. for len(s.publishers) == 0 {
  79. s.cond.Wait()
  80. }
  81. // it does not make sense in the first iteration,
  82. // but saves allocations in further iterations
  83. pairs = pairs[:0]
  84. for container, publisher := range s.publishers {
  85. // copy pointers here to release the lock ASAP
  86. pairs = append(pairs, publishersPair{container, publisher})
  87. }
  88. s.cond.L.Unlock()
  89. for _, pair := range pairs {
  90. stats, err := s.supervisor.GetContainerStats(pair.container)
  91. if err != nil {
  92. stats = &types.StatsJSON{
  93. Name: pair.container.Name,
  94. ID: pair.container.ID,
  95. }
  96. }
  97. pair.publisher.Publish(*stats)
  98. }
  99. time.Sleep(s.interval)
  100. }
  101. }