stats_collector_unix.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. // +build !windows
  2. package daemon
  3. import (
  4. "bufio"
  5. "fmt"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/Sirupsen/logrus"
  12. "github.com/docker/docker/container"
  13. "github.com/docker/docker/pkg/pubsub"
  14. "github.com/docker/engine-api/types"
  15. "github.com/opencontainers/runc/libcontainer/system"
  16. )
  17. type statsSupervisor interface {
  18. // GetContainerStats collects all the stats related to a container
  19. GetContainerStats(container *container.Container) (*types.StatsJSON, error)
  20. }
  21. // newStatsCollector returns a new statsCollector that collections
  22. // network and cgroup stats for a registered container at the specified
  23. // interval. The collector allows non-running containers to be added
  24. // and will start processing stats when they are started.
  25. func (daemon *Daemon) newStatsCollector(interval time.Duration) *statsCollector {
  26. s := &statsCollector{
  27. interval: interval,
  28. supervisor: daemon,
  29. publishers: make(map[*container.Container]*pubsub.Publisher),
  30. clockTicksPerSecond: uint64(system.GetClockTicks()),
  31. bufReader: bufio.NewReaderSize(nil, 128),
  32. }
  33. go s.run()
  34. return s
  35. }
  36. // statsCollector manages and provides container resource stats
  37. type statsCollector struct {
  38. m sync.Mutex
  39. supervisor statsSupervisor
  40. interval time.Duration
  41. clockTicksPerSecond uint64
  42. publishers map[*container.Container]*pubsub.Publisher
  43. bufReader *bufio.Reader
  44. }
  45. // collect registers the container with the collector and adds it to
  46. // the event loop for collection on the specified interval returning
  47. // a channel for the subscriber to receive on.
  48. func (s *statsCollector) collect(c *container.Container) chan interface{} {
  49. s.m.Lock()
  50. defer s.m.Unlock()
  51. publisher, exists := s.publishers[c]
  52. if !exists {
  53. publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
  54. s.publishers[c] = publisher
  55. }
  56. return publisher.Subscribe()
  57. }
  58. // stopCollection closes the channels for all subscribers and removes
  59. // the container from metrics collection.
  60. func (s *statsCollector) stopCollection(c *container.Container) {
  61. s.m.Lock()
  62. if publisher, exists := s.publishers[c]; exists {
  63. publisher.Close()
  64. delete(s.publishers, c)
  65. }
  66. s.m.Unlock()
  67. }
  68. // unsubscribe removes a specific subscriber from receiving updates for a container's stats.
  69. func (s *statsCollector) unsubscribe(c *container.Container, ch chan interface{}) {
  70. s.m.Lock()
  71. publisher := s.publishers[c]
  72. if publisher != nil {
  73. publisher.Evict(ch)
  74. if publisher.Len() == 0 {
  75. delete(s.publishers, c)
  76. }
  77. }
  78. s.m.Unlock()
  79. }
  80. func (s *statsCollector) run() {
  81. type publishersPair struct {
  82. container *container.Container
  83. publisher *pubsub.Publisher
  84. }
  85. // we cannot determine the capacity here.
  86. // it will grow enough in first iteration
  87. var pairs []publishersPair
  88. for range time.Tick(s.interval) {
  89. // it does not make sense in the first iteration,
  90. // but saves allocations in further iterations
  91. pairs = pairs[:0]
  92. s.m.Lock()
  93. for container, publisher := range s.publishers {
  94. // copy pointers here to release the lock ASAP
  95. pairs = append(pairs, publishersPair{container, publisher})
  96. }
  97. s.m.Unlock()
  98. if len(pairs) == 0 {
  99. continue
  100. }
  101. systemUsage, err := s.getSystemCPUUsage()
  102. if err != nil {
  103. logrus.Errorf("collecting system cpu usage: %v", err)
  104. continue
  105. }
  106. for _, pair := range pairs {
  107. stats, err := s.supervisor.GetContainerStats(pair.container)
  108. if err != nil {
  109. if _, ok := err.(errNotRunning); !ok {
  110. logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
  111. }
  112. continue
  113. }
  114. // FIXME: move to containerd
  115. stats.CPUStats.SystemUsage = systemUsage
  116. pair.publisher.Publish(stats)
  117. }
  118. }
  119. }
  120. const nanoSecondsPerSecond = 1e9
  121. // getSystemCPUUsage returns the host system's cpu usage in
  122. // nanoseconds. An error is returned if the format of the underlying
  123. // file does not match.
  124. //
  125. // Uses /proc/stat defined by POSIX. Looks for the cpu
  126. // statistics line and then sums up the first seven fields
  127. // provided. See `man 5 proc` for details on specific field
  128. // information.
  129. func (s *statsCollector) getSystemCPUUsage() (uint64, error) {
  130. var line string
  131. f, err := os.Open("/proc/stat")
  132. if err != nil {
  133. return 0, err
  134. }
  135. defer func() {
  136. s.bufReader.Reset(nil)
  137. f.Close()
  138. }()
  139. s.bufReader.Reset(f)
  140. err = nil
  141. for err == nil {
  142. line, err = s.bufReader.ReadString('\n')
  143. if err != nil {
  144. break
  145. }
  146. parts := strings.Fields(line)
  147. switch parts[0] {
  148. case "cpu":
  149. if len(parts) < 8 {
  150. return 0, fmt.Errorf("invalid number of cpu fields")
  151. }
  152. var totalClockTicks uint64
  153. for _, i := range parts[1:8] {
  154. v, err := strconv.ParseUint(i, 10, 64)
  155. if err != nil {
  156. return 0, fmt.Errorf("Unable to convert value %s to int: %s", i, err)
  157. }
  158. totalClockTicks += v
  159. }
  160. return (totalClockTicks * nanoSecondsPerSecond) /
  161. s.clockTicksPerSecond, nil
  162. }
  163. }
  164. return 0, fmt.Errorf("invalid stat format. Error trying to parse the '/proc/stat' file")
  165. }