瀏覽代碼

Merge pull request #20792 from cpuguy83/more_stats_client_cleanup

Fixes issue with stats on start event
Arnaud Porterie 9 年之前
父節點
當前提交
187a2fb403
共有 3 個文件被更改,包括 267 次插入233 次删除
  1. 30 0
      api/client/events.go
  2. 44 233
      api/client/stats.go
  3. 193 0
      api/client/stats_helpers.go

+ 30 - 0
api/client/events.go

@@ -6,10 +6,12 @@ import (
 	"io"
 	"sort"
 	"strings"
+	"sync"
 	"time"
 
 	"golang.org/x/net/context"
 
+	"github.com/Sirupsen/logrus"
 	Cli "github.com/docker/docker/cli"
 	"github.com/docker/docker/opts"
 	"github.com/docker/docker/pkg/jsonlog"
@@ -115,3 +117,31 @@ func printOutput(event eventtypes.Message, output io.Writer) {
 	}
 	fmt.Fprint(output, "\n")
 }
+
+type eventHandler struct {
+	handlers map[string]func(eventtypes.Message)
+	mu       sync.Mutex
+	closed   bool
+}
+
+func (w *eventHandler) Handle(action string, h func(eventtypes.Message)) {
+	w.mu.Lock()
+	w.handlers[action] = h
+	w.mu.Unlock()
+}
+
+// Watch ranges over the passed in event chan and processes the events based on the
+// handlers created for a given action.
+// To stop watching, close the event chan.
+func (w *eventHandler) Watch(c <-chan eventtypes.Message) {
+	for e := range c {
+		w.mu.Lock()
+		h, exists := w.handlers[e.Action]
+		w.mu.Unlock()
+		if !exists {
+			continue
+		}
+		logrus.Debugf("event handler: received event: %v", e)
+		go h(e)
+	}
+}

+ 44 - 233
api/client/stats.go

@@ -1,11 +1,9 @@
 package client
 
 import (
-	"encoding/json"
 	"fmt"
 	"io"
 	"strings"
-	"sync"
 	"text/tabwriter"
 	"time"
 
@@ -15,134 +13,8 @@ import (
 	"github.com/docker/engine-api/types"
 	"github.com/docker/engine-api/types/events"
 	"github.com/docker/engine-api/types/filters"
-	"github.com/docker/go-units"
 )
 
-type containerStats struct {
-	Name             string
-	CPUPercentage    float64
-	Memory           float64
-	MemoryLimit      float64
-	MemoryPercentage float64
-	NetworkRx        float64
-	NetworkTx        float64
-	BlockRead        float64
-	BlockWrite       float64
-	mu               sync.RWMutex
-	err              error
-}
-
-type stats struct {
-	mu sync.Mutex
-	cs []*containerStats
-}
-
-func (s *stats) isKnownContainer(cid string) bool {
-	for _, c := range s.cs {
-		if c.Name == cid {
-			return true
-		}
-	}
-	return false
-}
-
-func (s *containerStats) Collect(cli *DockerCli, streamStats bool) {
-	responseBody, err := cli.client.ContainerStats(context.Background(), s.Name, streamStats)
-	if err != nil {
-		s.mu.Lock()
-		s.err = err
-		s.mu.Unlock()
-		return
-	}
-	defer responseBody.Close()
-
-	var (
-		previousCPU    uint64
-		previousSystem uint64
-		dec            = json.NewDecoder(responseBody)
-		u              = make(chan error, 1)
-	)
-	go func() {
-		for {
-			var v *types.StatsJSON
-			if err := dec.Decode(&v); err != nil {
-				u <- err
-				return
-			}
-
-			var memPercent = 0.0
-			var cpuPercent = 0.0
-
-			// MemoryStats.Limit will never be 0 unless the container is not running and we haven't
-			// got any data from cgroup
-			if v.MemoryStats.Limit != 0 {
-				memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0
-			}
-
-			previousCPU = v.PreCPUStats.CPUUsage.TotalUsage
-			previousSystem = v.PreCPUStats.SystemUsage
-			cpuPercent = calculateCPUPercent(previousCPU, previousSystem, v)
-			blkRead, blkWrite := calculateBlockIO(v.BlkioStats)
-			s.mu.Lock()
-			s.CPUPercentage = cpuPercent
-			s.Memory = float64(v.MemoryStats.Usage)
-			s.MemoryLimit = float64(v.MemoryStats.Limit)
-			s.MemoryPercentage = memPercent
-			s.NetworkRx, s.NetworkTx = calculateNetwork(v.Networks)
-			s.BlockRead = float64(blkRead)
-			s.BlockWrite = float64(blkWrite)
-			s.mu.Unlock()
-			u <- nil
-			if !streamStats {
-				return
-			}
-		}
-	}()
-	for {
-		select {
-		case <-time.After(2 * time.Second):
-			// zero out the values if we have not received an update within
-			// the specified duration.
-			s.mu.Lock()
-			s.CPUPercentage = 0
-			s.Memory = 0
-			s.MemoryPercentage = 0
-			s.MemoryLimit = 0
-			s.NetworkRx = 0
-			s.NetworkTx = 0
-			s.BlockRead = 0
-			s.BlockWrite = 0
-			s.mu.Unlock()
-		case err := <-u:
-			if err != nil {
-				s.mu.Lock()
-				s.err = err
-				s.mu.Unlock()
-				return
-			}
-		}
-		if !streamStats {
-			return
-		}
-	}
-}
-
-func (s *containerStats) Display(w io.Writer) error {
-	s.mu.RLock()
-	defer s.mu.RUnlock()
-	if s.err != nil {
-		return s.err
-	}
-	fmt.Fprintf(w, "%s\t%.2f%%\t%s / %s\t%.2f%%\t%s / %s\t%s / %s\n",
-		s.Name,
-		s.CPUPercentage,
-		units.HumanSize(s.Memory), units.HumanSize(s.MemoryLimit),
-		s.MemoryPercentage,
-		units.HumanSize(s.NetworkRx), units.HumanSize(s.NetworkTx),
-		units.HumanSize(s.BlockRead), units.HumanSize(s.BlockWrite))
-	return nil
-}
-
 // CmdStats displays a live stream of resource usage statistics for one or more containers.
 //
 // This shows real-time information on CPU usage, memory usage, and network I/O.
@@ -157,27 +29,11 @@ func (cli *DockerCli) CmdStats(args ...string) error {
 
 	names := cmd.Args()
 	showAll := len(names) == 0
-
-	// The containerChan is the central synchronization piece for this function,
-	// and all messages to either add or remove an element to the list of
-	// monitored containers go through this.
-	//
-	//   - When watching all containers, a goroutine subscribes to the events
-	//     API endpoint and messages this channel accordingly.
-	//   - When watching a particular subset of containers, we feed the
-	//     requested list of containers to this channel.
-	//   - For both codepaths, a goroutine is responsible for watching this
-	//     channel and subscribing to the stats API for containers.
-	type containerEvent struct {
-		id    string
-		event string
-		err   error
-	}
-	containerChan := make(chan containerEvent)
+	closeChan := make(chan error)
 
 	// monitorContainerEvents watches for container creation and removal (only
 	// used when calling `docker stats` without arguments).
-	monitorContainerEvents := func(started chan<- struct{}, c chan<- containerEvent) {
+	monitorContainerEvents := func(started chan<- struct{}, c chan events.Message) {
 		f := filters.NewArgs()
 		f.Add("type", "container")
 		options := types.EventsOptions{
@@ -188,91 +44,82 @@ func (cli *DockerCli) CmdStats(args ...string) error {
 		// unblock the main goroutine.
 		close(started)
 		if err != nil {
-			c <- containerEvent{err: err}
+			closeChan <- err
 			return
 		}
 		defer resBody.Close()
+
 		decodeEvents(resBody, func(event events.Message, err error) error {
 			if err != nil {
-				c <- containerEvent{"", "", err}
-			} else {
-				c <- containerEvent{event.ID[:12], event.Action, err}
+				closeChan <- err
+				return nil
 			}
+			c <- event
 			return nil
 		})
 	}
 
+	cStats := stats{}
 	// getContainerList simulates creation event for all previously existing
 	// containers (only used when calling `docker stats` without arguments).
-	getContainerList := func(c chan<- containerEvent) {
+	getContainerList := func() {
 		options := types.ContainerListOptions{
 			All: *all,
 		}
 		cs, err := cli.client.ContainerList(options)
 		if err != nil {
-			containerChan <- containerEvent{"", "", err}
+			closeChan <- err
 		}
-		for _, c := range cs {
-			containerChan <- containerEvent{c.ID[:12], "create", nil}
+		for _, container := range cs {
+			s := &containerStats{Name: container.ID[:12]}
+			cStats.add(s)
+			go s.Collect(cli.client, !*noStream)
 		}
 	}
 
-	// Monitor the containerChan and start collection for each container.
-	cStats := stats{}
-	closeChan := make(chan error)
-	go func(stopChan chan<- error, c <-chan containerEvent) {
-		for {
-			event := <-c
-			if event.err != nil {
-				stopChan <- event.err
-				return
-			}
-			switch event.event {
-			case "create":
-				cStats.mu.Lock()
-				if !cStats.isKnownContainer(event.id) {
-					s := &containerStats{Name: event.id}
-					cStats.cs = append(cStats.cs, s)
-					go s.Collect(cli, !*noStream)
-				}
-				cStats.mu.Unlock()
-			case "stop":
-			case "die":
-				if !*all {
-					var remove int
-					// cStats cannot be O(1) with a map cause ranging over it would cause
-					// containers in stats to move up and down in the list...:(
-					cStats.mu.Lock()
-					for i, s := range cStats.cs {
-						if s.Name == event.id {
-							remove = i
-							break
-						}
-					}
-					cStats.cs = append(cStats.cs[:remove], cStats.cs[remove+1:]...)
-					cStats.mu.Unlock()
-				}
-			}
-		}
-	}(closeChan, containerChan)
-
 	if showAll {
 		// If no names were specified, start a long running goroutine which
 		// monitors container events. We make sure we're subscribed before
 		// retrieving the list of running containers to avoid a race where we
 		// would "miss" a creation.
 		started := make(chan struct{})
-		go monitorContainerEvents(started, containerChan)
+		eh := eventHandler{handlers: make(map[string]func(events.Message))}
+		eh.Handle("create", func(e events.Message) {
+			if *all {
+				s := &containerStats{Name: e.ID[:12]}
+				cStats.add(s)
+				go s.Collect(cli.client, !*noStream)
+			}
+		})
+
+		eh.Handle("start", func(e events.Message) {
+			s := &containerStats{Name: e.ID[:12]}
+			cStats.add(s)
+			go s.Collect(cli.client, !*noStream)
+		})
+
+		eh.Handle("die", func(e events.Message) {
+			if !*all {
+				cStats.remove(e.ID[:12])
+			}
+		})
+
+		eventChan := make(chan events.Message)
+		go eh.Watch(eventChan)
+		go monitorContainerEvents(started, eventChan)
+		defer close(eventChan)
 		<-started
 
 		// Start a short-lived goroutine to retrieve the initial list of
 		// containers.
-		go getContainerList(containerChan)
+		go getContainerList()
 	} else {
 		// Artificially send creation events for the containers we were asked to
 		// monitor (same code path than we use when monitoring all containers).
 		for _, name := range names {
-			containerChan <- containerEvent{name, "create", nil}
+			s := &containerStats{Name: name}
+			cStats.add(s)
+			go s.Collect(cli.client, !*noStream)
 		}
 
 		// We don't expect any asynchronous errors: closeChan can be closed.
@@ -304,6 +151,7 @@ func (cli *DockerCli) CmdStats(args ...string) error {
 		}
 		io.WriteString(w, "CONTAINER\tCPU %\tMEM USAGE / LIMIT\tMEM %\tNET I/O\tBLOCK I/O\n")
 	}
+
 	for range time.Tick(500 * time.Millisecond) {
 		printHeader()
 		toRemove := []int{}
@@ -343,40 +191,3 @@ func (cli *DockerCli) CmdStats(args ...string) error {
 	}
 	return nil
 }
-
-func calculateCPUPercent(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 {
-	var (
-		cpuPercent = 0.0
-		// calculate the change for the cpu usage of the container in between readings
-		cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU)
-		// calculate the change for the entire system between readings
-		systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem)
-	)
-
-	if systemDelta > 0.0 && cpuDelta > 0.0 {
-		cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0
-	}
-	return cpuPercent
-}
-
-func calculateBlockIO(blkio types.BlkioStats) (blkRead uint64, blkWrite uint64) {
-	for _, bioEntry := range blkio.IoServiceBytesRecursive {
-		switch strings.ToLower(bioEntry.Op) {
-		case "read":
-			blkRead = blkRead + bioEntry.Value
-		case "write":
-			blkWrite = blkWrite + bioEntry.Value
-		}
-	}
-	return
-}
-
-func calculateNetwork(network map[string]types.NetworkStats) (float64, float64) {
-	var rx, tx float64
-
-	for _, v := range network {
-		rx += float64(v.RxBytes)
-		tx += float64(v.TxBytes)
-	}
-	return rx, tx
-}

+ 193 - 0
api/client/stats_helpers.go

@@ -0,0 +1,193 @@
+package client
+
+import (
+	"encoding/json"
+	"fmt"
+	"io"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/docker/engine-api/client"
+	"github.com/docker/engine-api/types"
+	"github.com/docker/go-units"
+	"golang.org/x/net/context"
+)
+
+type containerStats struct {
+	Name             string
+	CPUPercentage    float64
+	Memory           float64
+	MemoryLimit      float64
+	MemoryPercentage float64
+	NetworkRx        float64
+	NetworkTx        float64
+	BlockRead        float64
+	BlockWrite       float64
+	mu               sync.RWMutex
+	err              error
+}
+
+type stats struct {
+	mu sync.Mutex
+	cs []*containerStats
+}
+
+func (s *stats) add(cs *containerStats) {
+	s.mu.Lock()
+	if _, exists := s.isKnownContainer(cs.Name); !exists {
+		s.cs = append(s.cs, cs)
+	}
+	s.mu.Unlock()
+}
+
+func (s *stats) remove(id string) {
+	s.mu.Lock()
+	if i, exists := s.isKnownContainer(id); exists {
+		s.cs = append(s.cs[:i], s.cs[i+1:]...)
+	}
+	s.mu.Unlock()
+}
+
+func (s *stats) isKnownContainer(cid string) (int, bool) {
+	for i, c := range s.cs {
+		if c.Name == cid {
+			return i, true
+		}
+	}
+	return -1, false
+}
+
+func (s *containerStats) Collect(cli client.APIClient, streamStats bool) {
+	responseBody, err := cli.ContainerStats(context.Background(), s.Name, streamStats)
+	if err != nil {
+		s.mu.Lock()
+		s.err = err
+		s.mu.Unlock()
+		return
+	}
+	defer responseBody.Close()
+
+	var (
+		previousCPU    uint64
+		previousSystem uint64
+		dec            = json.NewDecoder(responseBody)
+		u              = make(chan error, 1)
+	)
+	go func() {
+		for {
+			var v *types.StatsJSON
+			if err := dec.Decode(&v); err != nil {
+				u <- err
+				return
+			}
+
+			var memPercent = 0.0
+			var cpuPercent = 0.0
+
+			// MemoryStats.Limit will never be 0 unless the container is not running and we haven't
+			// got any data from cgroup
+			if v.MemoryStats.Limit != 0 {
+				memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0
+			}
+
+			previousCPU = v.PreCPUStats.CPUUsage.TotalUsage
+			previousSystem = v.PreCPUStats.SystemUsage
+			cpuPercent = calculateCPUPercent(previousCPU, previousSystem, v)
+			blkRead, blkWrite := calculateBlockIO(v.BlkioStats)
+			s.mu.Lock()
+			s.CPUPercentage = cpuPercent
+			s.Memory = float64(v.MemoryStats.Usage)
+			s.MemoryLimit = float64(v.MemoryStats.Limit)
+			s.MemoryPercentage = memPercent
+			s.NetworkRx, s.NetworkTx = calculateNetwork(v.Networks)
+			s.BlockRead = float64(blkRead)
+			s.BlockWrite = float64(blkWrite)
+			s.mu.Unlock()
+			u <- nil
+			if !streamStats {
+				return
+			}
+		}
+	}()
+	for {
+		select {
+		case <-time.After(2 * time.Second):
+			// zero out the values if we have not received an update within
+			// the specified duration.
+			s.mu.Lock()
+			s.CPUPercentage = 0
+			s.Memory = 0
+			s.MemoryPercentage = 0
+			s.MemoryLimit = 0
+			s.NetworkRx = 0
+			s.NetworkTx = 0
+			s.BlockRead = 0
+			s.BlockWrite = 0
+			s.mu.Unlock()
+		case err := <-u:
+			if err != nil {
+				s.mu.Lock()
+				s.err = err
+				s.mu.Unlock()
+				return
+			}
+		}
+		if !streamStats {
+			return
+		}
+	}
+}
+
+func (s *containerStats) Display(w io.Writer) error {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+	if s.err != nil {
+		return s.err
+	}
+	fmt.Fprintf(w, "%s\t%.2f%%\t%s / %s\t%.2f%%\t%s / %s\t%s / %s\n",
+		s.Name,
+		s.CPUPercentage,
+		units.HumanSize(s.Memory), units.HumanSize(s.MemoryLimit),
+		s.MemoryPercentage,
+		units.HumanSize(s.NetworkRx), units.HumanSize(s.NetworkTx),
+		units.HumanSize(s.BlockRead), units.HumanSize(s.BlockWrite))
+	return nil
+}
+
+func calculateCPUPercent(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 {
+	var (
+		cpuPercent = 0.0
+		// calculate the change for the cpu usage of the container in between readings
+		cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU)
+		// calculate the change for the entire system between readings
+		systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem)
+	)
+
+	if systemDelta > 0.0 && cpuDelta > 0.0 {
+		cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0
+	}
+	return cpuPercent
+}
+
+func calculateBlockIO(blkio types.BlkioStats) (blkRead uint64, blkWrite uint64) {
+	for _, bioEntry := range blkio.IoServiceBytesRecursive {
+		switch strings.ToLower(bioEntry.Op) {
+		case "read":
+			blkRead = blkRead + bioEntry.Value
+		case "write":
+			blkWrite = blkWrite + bioEntry.Value
+		}
+	}
+	return
+}
+
+func calculateNetwork(network map[string]types.NetworkStats) (float64, float64) {
+	var rx, tx float64
+
+	for _, v := range network {
+		rx += float64(v.RxBytes)
+		tx += float64(v.TxBytes)
+	}
+	return rx, tx
+}