stats_helpers.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package container
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/Sirupsen/logrus"
  11. "github.com/docker/engine-api/client"
  12. "github.com/docker/engine-api/types"
  13. "github.com/docker/go-units"
  14. "golang.org/x/net/context"
  15. )
  16. type containerStats struct {
  17. Name string
  18. CPUPercentage float64
  19. Memory float64
  20. MemoryLimit float64
  21. MemoryPercentage float64
  22. NetworkRx float64
  23. NetworkTx float64
  24. BlockRead float64
  25. BlockWrite float64
  26. PidsCurrent uint64
  27. mu sync.Mutex
  28. err error
  29. }
  30. type stats struct {
  31. mu sync.Mutex
  32. cs []*containerStats
  33. }
  34. func (s *stats) add(cs *containerStats) bool {
  35. s.mu.Lock()
  36. defer s.mu.Unlock()
  37. if _, exists := s.isKnownContainer(cs.Name); !exists {
  38. s.cs = append(s.cs, cs)
  39. return true
  40. }
  41. return false
  42. }
  43. func (s *stats) remove(id string) {
  44. s.mu.Lock()
  45. if i, exists := s.isKnownContainer(id); exists {
  46. s.cs = append(s.cs[:i], s.cs[i+1:]...)
  47. }
  48. s.mu.Unlock()
  49. }
  50. func (s *stats) isKnownContainer(cid string) (int, bool) {
  51. for i, c := range s.cs {
  52. if c.Name == cid {
  53. return i, true
  54. }
  55. }
  56. return -1, false
  57. }
  58. func (s *containerStats) Collect(ctx context.Context, cli client.APIClient, streamStats bool, waitFirst *sync.WaitGroup) {
  59. logrus.Debugf("collecting stats for %s", s.Name)
  60. var (
  61. getFirst bool
  62. previousCPU uint64
  63. previousSystem uint64
  64. u = make(chan error, 1)
  65. )
  66. defer func() {
  67. // if error happens and we get nothing of stats, release wait group whatever
  68. if !getFirst {
  69. getFirst = true
  70. waitFirst.Done()
  71. }
  72. }()
  73. responseBody, err := cli.ContainerStats(ctx, s.Name, streamStats)
  74. if err != nil {
  75. s.mu.Lock()
  76. s.err = err
  77. s.mu.Unlock()
  78. return
  79. }
  80. defer responseBody.Close()
  81. dec := json.NewDecoder(responseBody)
  82. go func() {
  83. for {
  84. var v *types.StatsJSON
  85. if err := dec.Decode(&v); err != nil {
  86. dec = json.NewDecoder(io.MultiReader(dec.Buffered(), responseBody))
  87. u <- err
  88. if err == io.EOF {
  89. break
  90. }
  91. time.Sleep(100 * time.Millisecond)
  92. continue
  93. }
  94. var memPercent = 0.0
  95. var cpuPercent = 0.0
  96. // MemoryStats.Limit will never be 0 unless the container is not running and we haven't
  97. // got any data from cgroup
  98. if v.MemoryStats.Limit != 0 {
  99. memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0
  100. }
  101. previousCPU = v.PreCPUStats.CPUUsage.TotalUsage
  102. previousSystem = v.PreCPUStats.SystemUsage
  103. cpuPercent = calculateCPUPercent(previousCPU, previousSystem, v)
  104. blkRead, blkWrite := calculateBlockIO(v.BlkioStats)
  105. s.mu.Lock()
  106. s.CPUPercentage = cpuPercent
  107. s.Memory = float64(v.MemoryStats.Usage)
  108. s.MemoryLimit = float64(v.MemoryStats.Limit)
  109. s.MemoryPercentage = memPercent
  110. s.NetworkRx, s.NetworkTx = calculateNetwork(v.Networks)
  111. s.BlockRead = float64(blkRead)
  112. s.BlockWrite = float64(blkWrite)
  113. s.PidsCurrent = v.PidsStats.Current
  114. s.mu.Unlock()
  115. u <- nil
  116. if !streamStats {
  117. return
  118. }
  119. }
  120. }()
  121. for {
  122. select {
  123. case <-time.After(2 * time.Second):
  124. // zero out the values if we have not received an update within
  125. // the specified duration.
  126. s.mu.Lock()
  127. s.CPUPercentage = 0
  128. s.Memory = 0
  129. s.MemoryPercentage = 0
  130. s.MemoryLimit = 0
  131. s.NetworkRx = 0
  132. s.NetworkTx = 0
  133. s.BlockRead = 0
  134. s.BlockWrite = 0
  135. s.PidsCurrent = 0
  136. s.err = errors.New("timeout waiting for stats")
  137. s.mu.Unlock()
  138. // if this is the first stat you get, release WaitGroup
  139. if !getFirst {
  140. getFirst = true
  141. waitFirst.Done()
  142. }
  143. case err := <-u:
  144. if err != nil {
  145. s.mu.Lock()
  146. s.err = err
  147. s.mu.Unlock()
  148. continue
  149. }
  150. s.err = nil
  151. // if this is the first stat you get, release WaitGroup
  152. if !getFirst {
  153. getFirst = true
  154. waitFirst.Done()
  155. }
  156. }
  157. if !streamStats {
  158. return
  159. }
  160. }
  161. }
  162. func (s *containerStats) Display(w io.Writer) error {
  163. s.mu.Lock()
  164. defer s.mu.Unlock()
  165. // NOTE: if you change this format, you must also change the err format below!
  166. format := "%s\t%.2f%%\t%s / %s\t%.2f%%\t%s / %s\t%s / %s\t%d\n"
  167. if s.err != nil {
  168. format = "%s\t%s\t%s / %s\t%s\t%s / %s\t%s / %s\t%s\n"
  169. errStr := "--"
  170. fmt.Fprintf(w, format,
  171. s.Name, errStr, errStr, errStr, errStr, errStr, errStr, errStr, errStr, errStr,
  172. )
  173. err := s.err
  174. return err
  175. }
  176. fmt.Fprintf(w, format,
  177. s.Name,
  178. s.CPUPercentage,
  179. units.BytesSize(s.Memory), units.BytesSize(s.MemoryLimit),
  180. s.MemoryPercentage,
  181. units.HumanSize(s.NetworkRx), units.HumanSize(s.NetworkTx),
  182. units.HumanSize(s.BlockRead), units.HumanSize(s.BlockWrite),
  183. s.PidsCurrent)
  184. return nil
  185. }
  186. func calculateCPUPercent(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 {
  187. var (
  188. cpuPercent = 0.0
  189. // calculate the change for the cpu usage of the container in between readings
  190. cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU)
  191. // calculate the change for the entire system between readings
  192. systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem)
  193. )
  194. if systemDelta > 0.0 && cpuDelta > 0.0 {
  195. cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0
  196. }
  197. return cpuPercent
  198. }
  199. func calculateBlockIO(blkio types.BlkioStats) (blkRead uint64, blkWrite uint64) {
  200. for _, bioEntry := range blkio.IoServiceBytesRecursive {
  201. switch strings.ToLower(bioEntry.Op) {
  202. case "read":
  203. blkRead = blkRead + bioEntry.Value
  204. case "write":
  205. blkWrite = blkWrite + bioEntry.Value
  206. }
  207. }
  208. return
  209. }
  210. func calculateNetwork(network map[string]types.NetworkStats) (float64, float64) {
  211. var rx, tx float64
  212. for _, v := range network {
  213. rx += float64(v.RxBytes)
  214. tx += float64(v.TxBytes)
  215. }
  216. return rx, tx
  217. }