stats_collector_unix.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. // +build !windows
  2. package daemon
  3. import (
  4. "bufio"
  5. "os"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/Sirupsen/logrus"
  11. "github.com/docker/docker/daemon/execdriver"
  12. derr "github.com/docker/docker/errors"
  13. "github.com/docker/docker/pkg/pubsub"
  14. lntypes "github.com/docker/libnetwork/types"
  15. "github.com/opencontainers/runc/libcontainer"
  16. "github.com/opencontainers/runc/libcontainer/system"
  17. )
  18. // newStatsCollector returns a new statsCollector that collections
  19. // network and cgroup stats for a registered container at the specified
  20. // interval. The collector allows non-running containers to be added
  21. // and will start processing stats when they are started.
  22. func newStatsCollector(interval time.Duration) *statsCollector {
  23. s := &statsCollector{
  24. interval: interval,
  25. publishers: make(map[*Container]*pubsub.Publisher),
  26. clockTicksPerSecond: uint64(system.GetClockTicks()),
  27. bufReader: bufio.NewReaderSize(nil, 128),
  28. }
  29. go s.run()
  30. return s
  31. }
  32. // statsCollector manages and provides container resource stats
  33. type statsCollector struct {
  34. m sync.Mutex
  35. interval time.Duration
  36. clockTicksPerSecond uint64
  37. publishers map[*Container]*pubsub.Publisher
  38. bufReader *bufio.Reader
  39. }
  40. // collect registers the container with the collector and adds it to
  41. // the event loop for collection on the specified interval returning
  42. // a channel for the subscriber to receive on.
  43. func (s *statsCollector) collect(c *Container) chan interface{} {
  44. s.m.Lock()
  45. defer s.m.Unlock()
  46. publisher, exists := s.publishers[c]
  47. if !exists {
  48. publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
  49. s.publishers[c] = publisher
  50. }
  51. return publisher.Subscribe()
  52. }
  53. // stopCollection closes the channels for all subscribers and removes
  54. // the container from metrics collection.
  55. func (s *statsCollector) stopCollection(c *Container) {
  56. s.m.Lock()
  57. if publisher, exists := s.publishers[c]; exists {
  58. publisher.Close()
  59. delete(s.publishers, c)
  60. }
  61. s.m.Unlock()
  62. }
  63. // unsubscribe removes a specific subscriber from receiving updates for a container's stats.
  64. func (s *statsCollector) unsubscribe(c *Container, ch chan interface{}) {
  65. s.m.Lock()
  66. publisher := s.publishers[c]
  67. if publisher != nil {
  68. publisher.Evict(ch)
  69. if publisher.Len() == 0 {
  70. delete(s.publishers, c)
  71. }
  72. }
  73. s.m.Unlock()
  74. }
  75. func (s *statsCollector) run() {
  76. type publishersPair struct {
  77. container *Container
  78. publisher *pubsub.Publisher
  79. }
  80. // we cannot determine the capacity here.
  81. // it will grow enough in first iteration
  82. var pairs []publishersPair
  83. for range time.Tick(s.interval) {
  84. // it does not make sense in the first iteration,
  85. // but saves allocations in further iterations
  86. pairs = pairs[:0]
  87. s.m.Lock()
  88. for container, publisher := range s.publishers {
  89. // copy pointers here to release the lock ASAP
  90. pairs = append(pairs, publishersPair{container, publisher})
  91. }
  92. s.m.Unlock()
  93. if len(pairs) == 0 {
  94. continue
  95. }
  96. systemUsage, err := s.getSystemCPUUsage()
  97. if err != nil {
  98. logrus.Errorf("collecting system cpu usage: %v", err)
  99. continue
  100. }
  101. for _, pair := range pairs {
  102. stats, err := pair.container.stats()
  103. if err != nil {
  104. if err != execdriver.ErrNotRunning {
  105. logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
  106. }
  107. continue
  108. }
  109. stats.SystemUsage = systemUsage
  110. // Retrieve the nw statistics from libnetwork and inject them in the Stats
  111. if nwStats, err := s.getNetworkStats(pair.container); err == nil {
  112. stats.Interfaces = nwStats
  113. }
  114. pair.publisher.Publish(stats)
  115. }
  116. }
  117. }
  118. const nanoSecondsPerSecond = 1e9
  119. // getSystemCPUUsage returns the host system's cpu usage in
  120. // nanoseconds. An error is returned if the format of the underlying
  121. // file does not match.
  122. //
  123. // Uses /proc/stat defined by POSIX. Looks for the cpu
  124. // statistics line and then sums up the first seven fields
  125. // provided. See `man 5 proc` for details on specific field
  126. // information.
  127. func (s *statsCollector) getSystemCPUUsage() (uint64, error) {
  128. var line string
  129. f, err := os.Open("/proc/stat")
  130. if err != nil {
  131. return 0, err
  132. }
  133. defer func() {
  134. s.bufReader.Reset(nil)
  135. f.Close()
  136. }()
  137. s.bufReader.Reset(f)
  138. err = nil
  139. for err == nil {
  140. line, err = s.bufReader.ReadString('\n')
  141. if err != nil {
  142. break
  143. }
  144. parts := strings.Fields(line)
  145. switch parts[0] {
  146. case "cpu":
  147. if len(parts) < 8 {
  148. return 0, derr.ErrorCodeBadCPUFields
  149. }
  150. var totalClockTicks uint64
  151. for _, i := range parts[1:8] {
  152. v, err := strconv.ParseUint(i, 10, 64)
  153. if err != nil {
  154. return 0, derr.ErrorCodeBadCPUInt.WithArgs(i, err)
  155. }
  156. totalClockTicks += v
  157. }
  158. return (totalClockTicks * nanoSecondsPerSecond) /
  159. s.clockTicksPerSecond, nil
  160. }
  161. }
  162. return 0, derr.ErrorCodeBadStatFormat
  163. }
  164. func (s *statsCollector) getNetworkStats(c *Container) ([]*libcontainer.NetworkInterface, error) {
  165. var list []*libcontainer.NetworkInterface
  166. sb, err := c.daemon.netController.SandboxByID(c.NetworkSettings.SandboxID)
  167. if err != nil {
  168. return list, err
  169. }
  170. stats, err := sb.Statistics()
  171. if err != nil {
  172. return list, err
  173. }
  174. // Convert libnetwork nw stats into libcontainer nw stats
  175. for ifName, ifStats := range stats {
  176. list = append(list, convertLnNetworkStats(ifName, ifStats))
  177. }
  178. return list, nil
  179. }
  180. func convertLnNetworkStats(name string, stats *lntypes.InterfaceStatistics) *libcontainer.NetworkInterface {
  181. n := &libcontainer.NetworkInterface{Name: name}
  182. n.RxBytes = stats.RxBytes
  183. n.RxPackets = stats.RxPackets
  184. n.RxErrors = stats.RxErrors
  185. n.RxDropped = stats.RxDropped
  186. n.TxBytes = stats.TxBytes
  187. n.TxPackets = stats.TxPackets
  188. n.TxErrors = stats.TxErrors
  189. n.TxDropped = stats.TxDropped
  190. return n
  191. }