Browse Source

Implement container stats collection in daemon

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
Michael Crosby 10 năm trước cách đây
mục cha
commit
65f58e2a74

+ 14 - 0
api/server/server.go

@@ -411,6 +411,19 @@ func getContainersJSON(eng *engine.Engine, version version.Version, w http.Respo
 	return nil
 }
 
+func getContainersStats(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
+	if err := parseForm(r); err != nil {
+		return err
+	}
+	if vars == nil {
+		return fmt.Errorf("Missing parameter")
+	}
+	name := vars["name"]
+	job := eng.Job("container_stats", name)
+	streamJSON(job, w, true)
+	return job.Run()
+}
+
 func getContainersLogs(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
 	if err := parseForm(r); err != nil {
 		return err
@@ -1323,6 +1336,7 @@ func createRouter(eng *engine.Engine, logging, enableCors bool, dockerVersion st
 			"/containers/{name:.*}/json":      getContainersByName,
 			"/containers/{name:.*}/top":       getContainersTop,
 			"/containers/{name:.*}/logs":      getContainersLogs,
+			"/containers/{name:.*}/stats":     getContainersStats,
 			"/containers/{name:.*}/attach/ws": wsContainersAttach,
 			"/exec/{id:.*}/json":              getExecByID,
 		},

+ 7 - 0
daemon/container.go

@@ -1414,3 +1414,10 @@ func (container *Container) getNetworkedContainer() (*Container, error) {
 		return nil, fmt.Errorf("network mode not set to container")
 	}
 }
+
+func (container *Container) Stats() (*execdriver.ResourceStats, error) {
+	if !container.IsRunning() {
+		return nil, fmt.Errorf("cannot collect stats on a non running container")
+	}
+	return container.daemon.Stats(container)
+}

+ 16 - 0
daemon/daemon.go

@@ -104,6 +104,7 @@ type Daemon struct {
 	driver         graphdriver.Driver
 	execDriver     execdriver.Driver
 	trustStore     *trust.TrustStore
+	statsCollector *statsCollector
 }
 
 // Install installs daemon capabilities to eng.
@@ -116,6 +117,7 @@ func (daemon *Daemon) Install(eng *engine.Engine) error {
 		"container_copy":    daemon.ContainerCopy,
 		"container_rename":  daemon.ContainerRename,
 		"container_inspect": daemon.ContainerInspect,
+		"container_stats":   daemon.ContainerStats,
 		"containers":        daemon.Containers,
 		"create":            daemon.ContainerCreate,
 		"rm":                daemon.ContainerRm,
@@ -982,6 +984,7 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine) (*Daemon, error)
 		execDriver:     ed,
 		eng:            eng,
 		trustStore:     t,
+		statsCollector: newStatsCollector(1 * time.Second),
 	}
 	if err := daemon.restore(); err != nil {
 		return nil, err
@@ -1092,6 +1095,19 @@ func (daemon *Daemon) Kill(c *Container, sig int) error {
 	return daemon.execDriver.Kill(c.command, sig)
 }
 
+func (daemon *Daemon) Stats(c *Container) (*execdriver.ResourceStats, error) {
+	return daemon.execDriver.Stats(c.ID)
+}
+
+func (daemon *Daemon) SubscribeToContainerStats(name string) (<-chan *execdriver.ResourceStats, error) {
+	c := daemon.Get(name)
+	if c == nil {
+		return nil, fmt.Errorf("no such container")
+	}
+	ch := daemon.statsCollector.collect(c)
+	return ch, nil
+}
+
 // Nuke kills all containers then removes all content
 // from the content root, including images, volumes and
 // container filesystems.

+ 9 - 0
daemon/execdriver/driver.go

@@ -5,7 +5,9 @@ import (
 	"io"
 	"os"
 	"os/exec"
+	"time"
 
+	"github.com/docker/libcontainer"
 	"github.com/docker/libcontainer/devices"
 )
 
@@ -61,6 +63,7 @@ type Driver interface {
 	GetPidsForContainer(id string) ([]int, error) // Returns a list of pids for the given container.
 	Terminate(c *Command) error                   // kill it with fire
 	Clean(id string) error                        // clean all traces of container exec
+	Stats(id string) (*ResourceStats, error)      // Get resource stats for a running container
 }
 
 // Network settings of the container
@@ -101,6 +104,12 @@ type Resources struct {
 	Cpuset     string `json:"cpuset"`
 }
 
+type ResourceStats struct {
+	*libcontainer.ContainerStats
+	Read       time.Time `json:"read"`
+	ClockTicks int       `json:"clock_ticks"`
+}
+
 type Mount struct {
 	Source      string `json:"source"`
 	Destination string `json:"destination"`

+ 5 - 0
daemon/execdriver/lxc/driver.go

@@ -524,3 +524,8 @@ func (t *TtyConsole) Close() error {
 func (d *driver) Exec(c *execdriver.Command, processConfig *execdriver.ProcessConfig, pipes *execdriver.Pipes, startCallback execdriver.StartCallback) (int, error) {
 	return -1, ErrExec
 }
+
+func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) {
+	return nil, fmt.Errorf("container stats are not support with LXC")
+
+}

+ 18 - 0
daemon/execdriver/native/driver.go

@@ -13,6 +13,7 @@ import (
 	"strings"
 	"sync"
 	"syscall"
+	"time"
 
 	log "github.com/Sirupsen/logrus"
 	"github.com/docker/docker/daemon/execdriver"
@@ -279,6 +280,23 @@ func (d *driver) Clean(id string) error {
 	return os.RemoveAll(filepath.Join(d.root, id))
 }
 
+func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) {
+	state, err := libcontainer.GetState(filepath.Join(d.root, id))
+	if err != nil {
+		return nil, err
+	}
+	now := time.Now()
+	stats, err := libcontainer.GetStats(nil, state)
+	if err != nil {
+		return nil, err
+	}
+	return &execdriver.ResourceStats{
+		ContainerStats: stats,
+		ClockTicks:     system.GetClockTicks(),
+		Read:           now,
+	}, nil
+}
+
 func getEnv(key string, env []string) string {
 	for _, pair := range env {
 		parts := strings.Split(pair, "=")

+ 15 - 0
daemon/start.go

@@ -1,6 +1,7 @@
 package daemon
 
 import (
+	"encoding/json"
 	"fmt"
 	"os"
 	"strings"
@@ -77,3 +78,17 @@ func (daemon *Daemon) setHostConfig(container *Container, hostConfig *runconfig.
 
 	return nil
 }
+
+func (daemon *Daemon) ContainerStats(job *engine.Job) engine.Status {
+	stats, err := daemon.SubscribeToContainerStats(job.Args[0])
+	if err != nil {
+		return job.Error(err)
+	}
+	enc := json.NewEncoder(job.Stdout)
+	for update := range stats {
+		if err := enc.Encode(update); err != nil {
+			return job.Error(err)
+		}
+	}
+	return engine.StatusOK
+}

+ 71 - 0
daemon/stats_collector.go

@@ -0,0 +1,71 @@
+package daemon
+
+import (
+	"sync"
+	"time"
+
+	log "github.com/Sirupsen/logrus"
+	"github.com/docker/docker/daemon/execdriver"
+)
+
+func newStatsCollector(interval time.Duration) *statsCollector {
+	s := &statsCollector{
+		interval:   interval,
+		containers: make(map[string]*statsCollectorData),
+	}
+	s.start()
+	return s
+}
+
+type statsCollectorData struct {
+	c         *Container
+	lastStats *execdriver.ResourceStats
+	subs      []chan *execdriver.ResourceStats
+}
+
+// statsCollector manages and provides container resource stats
+type statsCollector struct {
+	m          sync.Mutex
+	interval   time.Duration
+	containers map[string]*statsCollectorData
+}
+
+func (s *statsCollector) collect(c *Container) <-chan *execdriver.ResourceStats {
+	s.m.Lock()
+	ch := make(chan *execdriver.ResourceStats, 1024)
+	s.containers[c.ID] = &statsCollectorData{
+		c: c,
+		subs: []chan *execdriver.ResourceStats{
+			ch,
+		},
+	}
+	s.m.Unlock()
+	return ch
+}
+
+func (s *statsCollector) stopCollection(c *Container) {
+	s.m.Lock()
+	delete(s.containers, c.ID)
+	s.m.Unlock()
+}
+
+func (s *statsCollector) start() {
+	go func() {
+		for _ = range time.Tick(s.interval) {
+			log.Debugf("starting collection of container stats")
+			s.m.Lock()
+			for id, d := range s.containers {
+				stats, err := d.c.Stats()
+				if err != nil {
+					// TODO: @crosbymichael evict container depending on error
+					log.Errorf("collecting stats for %s: %v", id, err)
+					continue
+				}
+				for _, sub := range s.containers[id].subs {
+					sub <- stats
+				}
+			}
+			s.m.Unlock()
+		}
+	}()
+}