collector.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. // +build !solaris
  2. package stats
  3. import (
  4. "time"
  5. "github.com/Sirupsen/logrus"
  6. "github.com/docker/docker/api/types"
  7. "github.com/docker/docker/container"
  8. "github.com/docker/docker/pkg/pubsub"
  9. )
  10. // Collect registers the container with the collector and adds it to
  11. // the event loop for collection on the specified interval returning
  12. // a channel for the subscriber to receive on.
  13. func (s *Collector) Collect(c *container.Container) chan interface{} {
  14. s.m.Lock()
  15. defer s.m.Unlock()
  16. publisher, exists := s.publishers[c]
  17. if !exists {
  18. publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
  19. s.publishers[c] = publisher
  20. }
  21. return publisher.Subscribe()
  22. }
  23. // StopCollection closes the channels for all subscribers and removes
  24. // the container from metrics collection.
  25. func (s *Collector) StopCollection(c *container.Container) {
  26. s.m.Lock()
  27. if publisher, exists := s.publishers[c]; exists {
  28. publisher.Close()
  29. delete(s.publishers, c)
  30. }
  31. s.m.Unlock()
  32. }
  33. // Unsubscribe removes a specific subscriber from receiving updates for a container's stats.
  34. func (s *Collector) Unsubscribe(c *container.Container, ch chan interface{}) {
  35. s.m.Lock()
  36. publisher := s.publishers[c]
  37. if publisher != nil {
  38. publisher.Evict(ch)
  39. if publisher.Len() == 0 {
  40. delete(s.publishers, c)
  41. }
  42. }
  43. s.m.Unlock()
  44. }
  45. // Run starts the collectors and will indefinitely collect stats from the supervisor
  46. func (s *Collector) Run() {
  47. type publishersPair struct {
  48. container *container.Container
  49. publisher *pubsub.Publisher
  50. }
  51. // we cannot determine the capacity here.
  52. // it will grow enough in first iteration
  53. var pairs []publishersPair
  54. for range time.Tick(s.interval) {
  55. // it does not make sense in the first iteration,
  56. // but saves allocations in further iterations
  57. pairs = pairs[:0]
  58. s.m.Lock()
  59. for container, publisher := range s.publishers {
  60. // copy pointers here to release the lock ASAP
  61. pairs = append(pairs, publishersPair{container, publisher})
  62. }
  63. s.m.Unlock()
  64. if len(pairs) == 0 {
  65. continue
  66. }
  67. systemUsage, err := s.getSystemCPUUsage()
  68. if err != nil {
  69. logrus.Errorf("collecting system cpu usage: %v", err)
  70. continue
  71. }
  72. onlineCPUs, err := s.getNumberOnlineCPUs()
  73. if err != nil {
  74. logrus.Errorf("collecting system online cpu count: %v", err)
  75. continue
  76. }
  77. for _, pair := range pairs {
  78. stats, err := s.supervisor.GetContainerStats(pair.container)
  79. if err != nil {
  80. if _, ok := err.(notRunningErr); !ok {
  81. logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
  82. continue
  83. }
  84. // publish empty stats containing only name and ID if not running
  85. pair.publisher.Publish(types.StatsJSON{
  86. Name: pair.container.Name,
  87. ID: pair.container.ID,
  88. })
  89. continue
  90. }
  91. // FIXME: move to containerd on Linux (not Windows)
  92. stats.CPUStats.SystemUsage = systemUsage
  93. stats.CPUStats.OnlineCPUs = onlineCPUs
  94. pair.publisher.Publish(*stats)
  95. }
  96. }
  97. }
  98. type notRunningErr interface {
  99. error
  100. ContainerIsRunning() bool
  101. }