Browse Source

Use condition variable to wake stats collector.

Before the collection goroutine wakes up every 1 second (as configured).
This sleep interval is in case there are no stats to collect we don't
end up in a tight loop.

Instead use a condition variable to signal that a collection is needed.
This prevents us from waking the goroutine needlessly when there is no
one looking for stats.

For now I've kept the sleep just moved it to the end of the loop, which
gives some space between collections.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
Brian Goff 5 years ago
parent
commit
e75e6b0e31
1 changed files with 15 additions and 10 deletions
  1. 15 10
      daemon/stats/collector.go

+ 15 - 10
daemon/stats/collector.go

@@ -14,6 +14,7 @@ import (
 // Collector manages and provides container resource stats
 // Collector manages and provides container resource stats
 type Collector struct {
 type Collector struct {
 	m          sync.Mutex
 	m          sync.Mutex
+	cond       *sync.Cond
 	supervisor supervisor
 	supervisor supervisor
 	interval   time.Duration
 	interval   time.Duration
 	publishers map[*container.Container]*pubsub.Publisher
 	publishers map[*container.Container]*pubsub.Publisher
@@ -31,6 +32,7 @@ func NewCollector(supervisor supervisor, interval time.Duration) *Collector {
 		publishers: make(map[*container.Container]*pubsub.Publisher),
 		publishers: make(map[*container.Container]*pubsub.Publisher),
 		bufReader:  bufio.NewReaderSize(nil, 128),
 		bufReader:  bufio.NewReaderSize(nil, 128),
 	}
 	}
+	s.cond = sync.NewCond(&s.m)
 
 
 	platformNewStatsCollector(s)
 	platformNewStatsCollector(s)
 
 
@@ -46,13 +48,16 @@ type supervisor interface {
 // the event loop for collection on the specified interval returning
 // the event loop for collection on the specified interval returning
 // a channel for the subscriber to receive on.
 // a channel for the subscriber to receive on.
 func (s *Collector) Collect(c *container.Container) chan interface{} {
 func (s *Collector) Collect(c *container.Container) chan interface{} {
-	s.m.Lock()
-	defer s.m.Unlock()
+	s.cond.L.Lock()
+	defer s.cond.L.Unlock()
+
 	publisher, exists := s.publishers[c]
 	publisher, exists := s.publishers[c]
 	if !exists {
 	if !exists {
 		publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
 		publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
 		s.publishers[c] = publisher
 		s.publishers[c] = publisher
 	}
 	}
+
+	s.cond.Broadcast()
 	return publisher.Subscribe()
 	return publisher.Subscribe()
 }
 }
 
 
@@ -91,23 +96,21 @@ func (s *Collector) Run() {
 	var pairs []publishersPair
 	var pairs []publishersPair
 
 
 	for {
 	for {
-		// Put sleep at the start so that it will always be hit,
-		// preventing a tight loop if no stats are collected.
-		time.Sleep(s.interval)
+		s.cond.L.Lock()
+		for len(s.publishers) == 0 {
+			s.cond.Wait()
+		}
 
 
 		// it does not make sense in the first iteration,
 		// it does not make sense in the first iteration,
 		// but saves allocations in further iterations
 		// but saves allocations in further iterations
 		pairs = pairs[:0]
 		pairs = pairs[:0]
 
 
-		s.m.Lock()
 		for container, publisher := range s.publishers {
 		for container, publisher := range s.publishers {
 			// copy pointers here to release the lock ASAP
 			// copy pointers here to release the lock ASAP
 			pairs = append(pairs, publishersPair{container, publisher})
 			pairs = append(pairs, publishersPair{container, publisher})
 		}
 		}
-		s.m.Unlock()
-		if len(pairs) == 0 {
-			continue
-		}
+
+		s.cond.L.Unlock()
 
 
 		onlineCPUs, err := s.getNumberOnlineCPUs()
 		onlineCPUs, err := s.getNumberOnlineCPUs()
 		if err != nil {
 		if err != nil {
@@ -149,6 +152,8 @@ func (s *Collector) Run() {
 				})
 				})
 			}
 			}
 		}
 		}
+
+		time.Sleep(s.interval)
 	}
 	}
 }
 }