stats_collector_unix.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. // +build !windows
  2. package daemon
  3. import (
  4. "bufio"
  5. "fmt"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/Sirupsen/logrus"
  12. "github.com/docker/docker/container"
  13. "github.com/docker/docker/daemon/execdriver"
  14. "github.com/docker/docker/pkg/pubsub"
  15. "github.com/opencontainers/runc/libcontainer/system"
  16. )
  17. type statsSupervisor interface {
  18. // GetContainerStats collects all the stats related to a container
  19. GetContainerStats(container *container.Container) (*execdriver.ResourceStats, error)
  20. }
  21. // newStatsCollector returns a new statsCollector that collections
  22. // network and cgroup stats for a registered container at the specified
  23. // interval. The collector allows non-running containers to be added
  24. // and will start processing stats when they are started.
  25. func (daemon *Daemon) newStatsCollector(interval time.Duration) *statsCollector {
  26. s := &statsCollector{
  27. interval: interval,
  28. supervisor: daemon,
  29. publishers: make(map[*container.Container]*pubsub.Publisher),
  30. clockTicksPerSecond: uint64(system.GetClockTicks()),
  31. bufReader: bufio.NewReaderSize(nil, 128),
  32. }
  33. go s.run()
  34. return s
  35. }
  36. // statsCollector manages and provides container resource stats
  37. type statsCollector struct {
  38. m sync.Mutex
  39. supervisor statsSupervisor
  40. interval time.Duration
  41. clockTicksPerSecond uint64
  42. publishers map[*container.Container]*pubsub.Publisher
  43. bufReader *bufio.Reader
  44. }
  45. // collect registers the container with the collector and adds it to
  46. // the event loop for collection on the specified interval returning
  47. // a channel for the subscriber to receive on.
  48. func (s *statsCollector) collect(c *container.Container) chan interface{} {
  49. s.m.Lock()
  50. defer s.m.Unlock()
  51. publisher, exists := s.publishers[c]
  52. if !exists {
  53. publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
  54. s.publishers[c] = publisher
  55. }
  56. return publisher.Subscribe()
  57. }
  58. // stopCollection closes the channels for all subscribers and removes
  59. // the container from metrics collection.
  60. func (s *statsCollector) stopCollection(c *container.Container) {
  61. s.m.Lock()
  62. if publisher, exists := s.publishers[c]; exists {
  63. publisher.Close()
  64. delete(s.publishers, c)
  65. }
  66. s.m.Unlock()
  67. }
  68. // unsubscribe removes a specific subscriber from receiving updates for a container's stats.
  69. func (s *statsCollector) unsubscribe(c *container.Container, ch chan interface{}) {
  70. s.m.Lock()
  71. publisher := s.publishers[c]
  72. if publisher != nil {
  73. publisher.Evict(ch)
  74. if publisher.Len() == 0 {
  75. delete(s.publishers, c)
  76. }
  77. }
  78. s.m.Unlock()
  79. }
  80. func (s *statsCollector) run() {
  81. type publishersPair struct {
  82. container *container.Container
  83. publisher *pubsub.Publisher
  84. }
  85. // we cannot determine the capacity here.
  86. // it will grow enough in first iteration
  87. var pairs []publishersPair
  88. for range time.Tick(s.interval) {
  89. // it does not make sense in the first iteration,
  90. // but saves allocations in further iterations
  91. pairs = pairs[:0]
  92. s.m.Lock()
  93. for container, publisher := range s.publishers {
  94. // copy pointers here to release the lock ASAP
  95. pairs = append(pairs, publishersPair{container, publisher})
  96. }
  97. s.m.Unlock()
  98. if len(pairs) == 0 {
  99. continue
  100. }
  101. systemUsage, err := s.getSystemCPUUsage()
  102. if err != nil {
  103. logrus.Errorf("collecting system cpu usage: %v", err)
  104. continue
  105. }
  106. for _, pair := range pairs {
  107. stats, err := s.supervisor.GetContainerStats(pair.container)
  108. if err != nil {
  109. if err != execdriver.ErrNotRunning {
  110. logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
  111. }
  112. continue
  113. }
  114. stats.SystemUsage = systemUsage
  115. pair.publisher.Publish(stats)
  116. }
  117. }
  118. }
  119. const nanoSecondsPerSecond = 1e9
  120. // getSystemCPUUsage returns the host system's cpu usage in
  121. // nanoseconds. An error is returned if the format of the underlying
  122. // file does not match.
  123. //
  124. // Uses /proc/stat defined by POSIX. Looks for the cpu
  125. // statistics line and then sums up the first seven fields
  126. // provided. See `man 5 proc` for details on specific field
  127. // information.
  128. func (s *statsCollector) getSystemCPUUsage() (uint64, error) {
  129. var line string
  130. f, err := os.Open("/proc/stat")
  131. if err != nil {
  132. return 0, err
  133. }
  134. defer func() {
  135. s.bufReader.Reset(nil)
  136. f.Close()
  137. }()
  138. s.bufReader.Reset(f)
  139. err = nil
  140. for err == nil {
  141. line, err = s.bufReader.ReadString('\n')
  142. if err != nil {
  143. break
  144. }
  145. parts := strings.Fields(line)
  146. switch parts[0] {
  147. case "cpu":
  148. if len(parts) < 8 {
  149. return 0, fmt.Errorf("invalid number of cpu fields")
  150. }
  151. var totalClockTicks uint64
  152. for _, i := range parts[1:8] {
  153. v, err := strconv.ParseUint(i, 10, 64)
  154. if err != nil {
  155. return 0, fmt.Errorf("Unable to convert value %s to int: %s", i, err)
  156. }
  157. totalClockTicks += v
  158. }
  159. return (totalClockTicks * nanoSecondsPerSecond) /
  160. s.clockTicksPerSecond, nil
  161. }
  162. }
  163. return 0, fmt.Errorf("invalid stat format. Error trying to parse the '/proc/stat' file")
  164. }