stats_collector.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. // +build !solaris
  2. package daemon
  3. import (
  4. "bufio"
  5. "sync"
  6. "time"
  7. "github.com/Sirupsen/logrus"
  8. "github.com/docker/docker/api/types"
  9. "github.com/docker/docker/container"
  10. "github.com/docker/docker/pkg/pubsub"
  11. )
  12. type statsSupervisor interface {
  13. // GetContainerStats collects all the stats related to a container
  14. GetContainerStats(container *container.Container) (*types.StatsJSON, error)
  15. }
  16. // newStatsCollector returns a new statsCollector that collections
  17. // stats for a registered container at the specified interval.
  18. // The collector allows non-running containers to be added
  19. // and will start processing stats when they are started.
  20. func (daemon *Daemon) newStatsCollector(interval time.Duration) *statsCollector {
  21. s := &statsCollector{
  22. interval: interval,
  23. supervisor: daemon,
  24. publishers: make(map[*container.Container]*pubsub.Publisher),
  25. bufReader: bufio.NewReaderSize(nil, 128),
  26. }
  27. platformNewStatsCollector(s)
  28. go s.run()
  29. return s
  30. }
  31. // statsCollector manages and provides container resource stats
  32. type statsCollector struct {
  33. m sync.Mutex
  34. supervisor statsSupervisor
  35. interval time.Duration
  36. publishers map[*container.Container]*pubsub.Publisher
  37. bufReader *bufio.Reader
  38. // The following fields are not set on Windows currently.
  39. clockTicksPerSecond uint64
  40. machineMemory uint64
  41. }
  42. // collect registers the container with the collector and adds it to
  43. // the event loop for collection on the specified interval returning
  44. // a channel for the subscriber to receive on.
  45. func (s *statsCollector) collect(c *container.Container) chan interface{} {
  46. s.m.Lock()
  47. defer s.m.Unlock()
  48. publisher, exists := s.publishers[c]
  49. if !exists {
  50. publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
  51. s.publishers[c] = publisher
  52. }
  53. return publisher.Subscribe()
  54. }
  55. // stopCollection closes the channels for all subscribers and removes
  56. // the container from metrics collection.
  57. func (s *statsCollector) stopCollection(c *container.Container) {
  58. s.m.Lock()
  59. if publisher, exists := s.publishers[c]; exists {
  60. publisher.Close()
  61. delete(s.publishers, c)
  62. }
  63. s.m.Unlock()
  64. }
  65. // unsubscribe removes a specific subscriber from receiving updates for a container's stats.
  66. func (s *statsCollector) unsubscribe(c *container.Container, ch chan interface{}) {
  67. s.m.Lock()
  68. publisher := s.publishers[c]
  69. if publisher != nil {
  70. publisher.Evict(ch)
  71. if publisher.Len() == 0 {
  72. delete(s.publishers, c)
  73. }
  74. }
  75. s.m.Unlock()
  76. }
  77. func (s *statsCollector) run() {
  78. type publishersPair struct {
  79. container *container.Container
  80. publisher *pubsub.Publisher
  81. }
  82. // we cannot determine the capacity here.
  83. // it will grow enough in first iteration
  84. var pairs []publishersPair
  85. for range time.Tick(s.interval) {
  86. // it does not make sense in the first iteration,
  87. // but saves allocations in further iterations
  88. pairs = pairs[:0]
  89. s.m.Lock()
  90. for container, publisher := range s.publishers {
  91. // copy pointers here to release the lock ASAP
  92. pairs = append(pairs, publishersPair{container, publisher})
  93. }
  94. s.m.Unlock()
  95. if len(pairs) == 0 {
  96. continue
  97. }
  98. systemUsage, err := s.getSystemCPUUsage()
  99. if err != nil {
  100. logrus.Errorf("collecting system cpu usage: %v", err)
  101. continue
  102. }
  103. for _, pair := range pairs {
  104. stats, err := s.supervisor.GetContainerStats(pair.container)
  105. if err != nil {
  106. if _, ok := err.(errNotRunning); !ok {
  107. logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
  108. }
  109. continue
  110. }
  111. // FIXME: move to containerd on Linux (not Windows)
  112. stats.CPUStats.SystemUsage = systemUsage
  113. pair.publisher.Publish(*stats)
  114. }
  115. }
  116. }