stats_collector_unix.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. // +build !windows,!solaris
  2. package daemon
  3. import (
  4. "bufio"
  5. "fmt"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/Sirupsen/logrus"
  12. "github.com/docker/docker/container"
  13. "github.com/docker/docker/pkg/pubsub"
  14. sysinfo "github.com/docker/docker/pkg/system"
  15. "github.com/docker/engine-api/types"
  16. "github.com/opencontainers/runc/libcontainer/system"
  17. )
  18. type statsSupervisor interface {
  19. // GetContainerStats collects all the stats related to a container
  20. GetContainerStats(container *container.Container) (*types.StatsJSON, error)
  21. }
  22. // newStatsCollector returns a new statsCollector that collections
  23. // network and cgroup stats for a registered container at the specified
  24. // interval. The collector allows non-running containers to be added
  25. // and will start processing stats when they are started.
  26. func (daemon *Daemon) newStatsCollector(interval time.Duration) *statsCollector {
  27. s := &statsCollector{
  28. interval: interval,
  29. supervisor: daemon,
  30. publishers: make(map[*container.Container]*pubsub.Publisher),
  31. clockTicksPerSecond: uint64(system.GetClockTicks()),
  32. bufReader: bufio.NewReaderSize(nil, 128),
  33. }
  34. meminfo, err := sysinfo.ReadMemInfo()
  35. if err == nil && meminfo.MemTotal > 0 {
  36. s.machineMemory = uint64(meminfo.MemTotal)
  37. }
  38. go s.run()
  39. return s
  40. }
  41. // statsCollector manages and provides container resource stats
  42. type statsCollector struct {
  43. m sync.Mutex
  44. supervisor statsSupervisor
  45. interval time.Duration
  46. clockTicksPerSecond uint64
  47. publishers map[*container.Container]*pubsub.Publisher
  48. bufReader *bufio.Reader
  49. machineMemory uint64
  50. }
  51. // collect registers the container with the collector and adds it to
  52. // the event loop for collection on the specified interval returning
  53. // a channel for the subscriber to receive on.
  54. func (s *statsCollector) collect(c *container.Container) chan interface{} {
  55. s.m.Lock()
  56. defer s.m.Unlock()
  57. publisher, exists := s.publishers[c]
  58. if !exists {
  59. publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
  60. s.publishers[c] = publisher
  61. }
  62. return publisher.Subscribe()
  63. }
  64. // stopCollection closes the channels for all subscribers and removes
  65. // the container from metrics collection.
  66. func (s *statsCollector) stopCollection(c *container.Container) {
  67. s.m.Lock()
  68. if publisher, exists := s.publishers[c]; exists {
  69. publisher.Close()
  70. delete(s.publishers, c)
  71. }
  72. s.m.Unlock()
  73. }
  74. // unsubscribe removes a specific subscriber from receiving updates for a container's stats.
  75. func (s *statsCollector) unsubscribe(c *container.Container, ch chan interface{}) {
  76. s.m.Lock()
  77. publisher := s.publishers[c]
  78. if publisher != nil {
  79. publisher.Evict(ch)
  80. if publisher.Len() == 0 {
  81. delete(s.publishers, c)
  82. }
  83. }
  84. s.m.Unlock()
  85. }
  86. func (s *statsCollector) run() {
  87. type publishersPair struct {
  88. container *container.Container
  89. publisher *pubsub.Publisher
  90. }
  91. // we cannot determine the capacity here.
  92. // it will grow enough in first iteration
  93. var pairs []publishersPair
  94. for range time.Tick(s.interval) {
  95. // it does not make sense in the first iteration,
  96. // but saves allocations in further iterations
  97. pairs = pairs[:0]
  98. s.m.Lock()
  99. for container, publisher := range s.publishers {
  100. // copy pointers here to release the lock ASAP
  101. pairs = append(pairs, publishersPair{container, publisher})
  102. }
  103. s.m.Unlock()
  104. if len(pairs) == 0 {
  105. continue
  106. }
  107. systemUsage, err := s.getSystemCPUUsage()
  108. if err != nil {
  109. logrus.Errorf("collecting system cpu usage: %v", err)
  110. continue
  111. }
  112. for _, pair := range pairs {
  113. stats, err := s.supervisor.GetContainerStats(pair.container)
  114. if err != nil {
  115. if _, ok := err.(errNotRunning); !ok {
  116. logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
  117. }
  118. continue
  119. }
  120. // FIXME: move to containerd
  121. stats.CPUStats.SystemUsage = systemUsage
  122. pair.publisher.Publish(*stats)
  123. }
  124. }
  125. }
  126. const nanoSecondsPerSecond = 1e9
  127. // getSystemCPUUsage returns the host system's cpu usage in
  128. // nanoseconds. An error is returned if the format of the underlying
  129. // file does not match.
  130. //
  131. // Uses /proc/stat defined by POSIX. Looks for the cpu
  132. // statistics line and then sums up the first seven fields
  133. // provided. See `man 5 proc` for details on specific field
  134. // information.
  135. func (s *statsCollector) getSystemCPUUsage() (uint64, error) {
  136. var line string
  137. f, err := os.Open("/proc/stat")
  138. if err != nil {
  139. return 0, err
  140. }
  141. defer func() {
  142. s.bufReader.Reset(nil)
  143. f.Close()
  144. }()
  145. s.bufReader.Reset(f)
  146. err = nil
  147. for err == nil {
  148. line, err = s.bufReader.ReadString('\n')
  149. if err != nil {
  150. break
  151. }
  152. parts := strings.Fields(line)
  153. switch parts[0] {
  154. case "cpu":
  155. if len(parts) < 8 {
  156. return 0, fmt.Errorf("invalid number of cpu fields")
  157. }
  158. var totalClockTicks uint64
  159. for _, i := range parts[1:8] {
  160. v, err := strconv.ParseUint(i, 10, 64)
  161. if err != nil {
  162. return 0, fmt.Errorf("Unable to convert value %s to int: %s", i, err)
  163. }
  164. totalClockTicks += v
  165. }
  166. return (totalClockTicks * nanoSecondsPerSecond) /
  167. s.clockTicksPerSecond, nil
  168. }
  169. }
  170. return 0, fmt.Errorf("invalid stat format. Error trying to parse the '/proc/stat' file")
  171. }