stats_collector_unix.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. // +build !windows
  2. package daemon
  3. import (
  4. "bufio"
  5. "fmt"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/Sirupsen/logrus"
  12. "github.com/docker/docker/daemon/execdriver"
  13. "github.com/docker/docker/pkg/pubsub"
  14. "github.com/opencontainers/runc/libcontainer/system"
  15. )
  16. // newStatsCollector returns a new statsCollector that collections
  17. // network and cgroup stats for a registered container at the specified
  18. // interval. The collector allows non-running containers to be added
  19. // and will start processing stats when they are started.
  20. func newStatsCollector(interval time.Duration) *statsCollector {
  21. s := &statsCollector{
  22. interval: interval,
  23. publishers: make(map[*Container]*pubsub.Publisher),
  24. clockTicksPerSecond: uint64(system.GetClockTicks()),
  25. bufReader: bufio.NewReaderSize(nil, 128),
  26. }
  27. go s.run()
  28. return s
  29. }
  30. // statsCollector manages and provides container resource stats
  31. type statsCollector struct {
  32. m sync.Mutex
  33. interval time.Duration
  34. clockTicksPerSecond uint64
  35. publishers map[*Container]*pubsub.Publisher
  36. bufReader *bufio.Reader
  37. }
  38. // collect registers the container with the collector and adds it to
  39. // the event loop for collection on the specified interval returning
  40. // a channel for the subscriber to receive on.
  41. func (s *statsCollector) collect(c *Container) chan interface{} {
  42. s.m.Lock()
  43. defer s.m.Unlock()
  44. publisher, exists := s.publishers[c]
  45. if !exists {
  46. publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
  47. s.publishers[c] = publisher
  48. }
  49. return publisher.Subscribe()
  50. }
  51. // stopCollection closes the channels for all subscribers and removes
  52. // the container from metrics collection.
  53. func (s *statsCollector) stopCollection(c *Container) {
  54. s.m.Lock()
  55. if publisher, exists := s.publishers[c]; exists {
  56. publisher.Close()
  57. delete(s.publishers, c)
  58. }
  59. s.m.Unlock()
  60. }
  61. // unsubscribe removes a specific subscriber from receiving updates for a container's stats.
  62. func (s *statsCollector) unsubscribe(c *Container, ch chan interface{}) {
  63. s.m.Lock()
  64. publisher := s.publishers[c]
  65. if publisher != nil {
  66. publisher.Evict(ch)
  67. if publisher.Len() == 0 {
  68. delete(s.publishers, c)
  69. }
  70. }
  71. s.m.Unlock()
  72. }
  73. func (s *statsCollector) run() {
  74. type publishersPair struct {
  75. container *Container
  76. publisher *pubsub.Publisher
  77. }
  78. // we cannot determine the capacity here.
  79. // it will grow enough in first iteration
  80. var pairs []publishersPair
  81. for range time.Tick(s.interval) {
  82. // it does not make sense in the first iteration,
  83. // but saves allocations in further iterations
  84. pairs = pairs[:0]
  85. s.m.Lock()
  86. for container, publisher := range s.publishers {
  87. // copy pointers here to release the lock ASAP
  88. pairs = append(pairs, publishersPair{container, publisher})
  89. }
  90. s.m.Unlock()
  91. if len(pairs) == 0 {
  92. continue
  93. }
  94. systemUsage, err := s.getSystemCPUUsage()
  95. if err != nil {
  96. logrus.Errorf("collecting system cpu usage: %v", err)
  97. continue
  98. }
  99. for _, pair := range pairs {
  100. stats, err := pair.container.stats()
  101. if err != nil {
  102. if err != execdriver.ErrNotRunning {
  103. logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
  104. }
  105. continue
  106. }
  107. stats.SystemUsage = systemUsage
  108. pair.publisher.Publish(stats)
  109. }
  110. }
  111. }
  112. const nanoSecondsPerSecond = 1e9
  113. // getSystemCPUUsage returns the host system's cpu usage in
  114. // nanoseconds. An error is returned if the format of the underlying
  115. // file does not match.
  116. //
  117. // Uses /proc/stat defined by POSIX. Looks for the cpu
  118. // statistics line and then sums up the first seven fields
  119. // provided. See `man 5 proc` for details on specific field
  120. // information.
  121. func (s *statsCollector) getSystemCPUUsage() (uint64, error) {
  122. var line string
  123. f, err := os.Open("/proc/stat")
  124. if err != nil {
  125. return 0, err
  126. }
  127. defer func() {
  128. s.bufReader.Reset(nil)
  129. f.Close()
  130. }()
  131. s.bufReader.Reset(f)
  132. err = nil
  133. for err == nil {
  134. line, err = s.bufReader.ReadString('\n')
  135. if err != nil {
  136. break
  137. }
  138. parts := strings.Fields(line)
  139. switch parts[0] {
  140. case "cpu":
  141. if len(parts) < 8 {
  142. return 0, fmt.Errorf("invalid number of cpu fields")
  143. }
  144. var totalClockTicks uint64
  145. for _, i := range parts[1:8] {
  146. v, err := strconv.ParseUint(i, 10, 64)
  147. if err != nil {
  148. return 0, fmt.Errorf("Unable to convert value %s to int: %s", i, err)
  149. }
  150. totalClockTicks += v
  151. }
  152. return (totalClockTicks * nanoSecondsPerSecond) /
  153. s.clockTicksPerSecond, nil
  154. }
  155. }
  156. return 0, fmt.Errorf("invalid stat format")
  157. }