Преглед на файлове

fix the goroutine leak in the stats API if the container is not running

Signed-off-by: Shijiang Wei <mountkin@gmail.com>
Shijiang Wei преди 10 години
родител
ревизия
1cbf5a54da
променени са 4 файла, в които са добавени 86 реда и са изтрити 21 реда
  1. 12 1
      api/server/server.go
  2. 36 18
      daemon/stats.go
  3. 36 0
      integration-cli/docker_api_stats_test.go
  4. 2 2
      integration-cli/docker_utils.go

+ 12 - 1
api/server/server.go

@@ -581,7 +581,18 @@ func (s *Server) getContainersStats(version version.Version, w http.ResponseWrit
 		out = ioutils.NewWriteFlusher(w)
 		out = ioutils.NewWriteFlusher(w)
 	}
 	}
 
 
-	return s.daemon.ContainerStats(vars["name"], stream, out)
+	var closeNotifier <-chan bool
+	if notifier, ok := w.(http.CloseNotifier); ok {
+		closeNotifier = notifier.CloseNotify()
+	}
+
+	config := &daemon.ContainerStatsConfig{
+		Stream:    stream,
+		OutStream: out,
+		Stop:      closeNotifier,
+	}
+
+	return s.daemon.ContainerStats(vars["name"], config)
 }
 }
 
 
 func (s *Server) getContainersLogs(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
 func (s *Server) getContainersLogs(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {

+ 36 - 18
daemon/stats.go

@@ -8,12 +8,22 @@ import (
 	"github.com/docker/docker/daemon/execdriver"
 	"github.com/docker/docker/daemon/execdriver"
 )
 )
 
 
-func (daemon *Daemon) ContainerStats(name string, stream bool, out io.Writer) error {
+type ContainerStatsConfig struct {
+	Stream    bool
+	OutStream io.Writer
+	Stop      <-chan bool
+}
+
+func (daemon *Daemon) ContainerStats(name string, config *ContainerStatsConfig) error {
 	updates, err := daemon.SubscribeToContainerStats(name)
 	updates, err := daemon.SubscribeToContainerStats(name)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
+	if config.Stream {
+		config.OutStream.Write(nil)
+	}
+
 	var preCpuStats types.CpuStats
 	var preCpuStats types.CpuStats
 	getStat := func(v interface{}) *types.Stats {
 	getStat := func(v interface{}) *types.Stats {
 		update := v.(*execdriver.ResourceStats)
 		update := v.(*execdriver.ResourceStats)
@@ -26,26 +36,34 @@ func (daemon *Daemon) ContainerStats(name string, stream bool, out io.Writer) er
 		return ss
 		return ss
 	}
 	}
 
 
-	enc := json.NewEncoder(out)
+	enc := json.NewEncoder(config.OutStream)
 
 
-	if !stream {
-		// prime the cpu stats so they aren't 0 in the final output
-		s := getStat(<-updates)
+	defer daemon.UnsubscribeToContainerStats(name, updates)
 
 
-		// now pull stats again with the cpu stats primed
-		s = getStat(<-updates)
-		err := enc.Encode(s)
-		daemon.UnsubscribeToContainerStats(name, updates)
-		return err
-	}
+	noStreamFirstFrame := true
+	for {
+		select {
+		case v, ok := <-updates:
+			if !ok {
+				return nil
+			}
+
+			s := getStat(v)
+			if !config.Stream && noStreamFirstFrame {
+				// prime the cpu stats so they aren't 0 in the final output
+				noStreamFirstFrame = false
+				continue
+			}
+
+			if err := enc.Encode(s); err != nil {
+				return err
+			}
 
 
-	for v := range updates {
-		s := getStat(v)
-		if err := enc.Encode(s); err != nil {
-			// TODO: handle the specific broken pipe
-			daemon.UnsubscribeToContainerStats(name, updates)
-			return err
+			if !config.Stream {
+				return nil
+			}
+		case <-config.Stop:
+			return nil
 		}
 		}
 	}
 	}
-	return nil
 }
 }

+ 36 - 0
integration-cli/docker_api_stats_test.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
 	"strings"
 	"strings"
+	"time"
 
 
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types"
 	"github.com/go-check/check"
 	"github.com/go-check/check"
@@ -33,3 +34,38 @@ func (s *DockerSuite) TestCliStatsNoStreamGetCpu(c *check.C) {
 		c.Fatalf("docker stats with no-stream get cpu usage failed: was %v", cpuPercent)
 		c.Fatalf("docker stats with no-stream get cpu usage failed: was %v", cpuPercent)
 	}
 	}
 }
 }
+
+func (s *DockerSuite) TestStoppedContainerStatsGoroutines(c *check.C) {
+	out, _ := dockerCmd(c, "run", "-d", "busybox", "/bin/sh", "-c", "echo 1")
+	id := strings.TrimSpace(out)
+
+	getGoRoutines := func() int {
+		_, body, err := sockRequestRaw("GET", fmt.Sprintf("/info"), nil, "")
+		c.Assert(err, check.IsNil)
+		info := types.Info{}
+		err = json.NewDecoder(body).Decode(&info)
+		c.Assert(err, check.IsNil)
+		body.Close()
+		return info.NGoroutines
+	}
+
+	// When the HTTP connection is closed, the number of goroutines should not increase.
+	routines := getGoRoutines()
+	_, body, err := sockRequestRaw("GET", fmt.Sprintf("/containers/%s/stats", id), nil, "")
+	c.Assert(err, check.IsNil)
+	body.Close()
+
+	t := time.After(30 * time.Second)
+	for {
+		select {
+		case <-t:
+			c.Assert(getGoRoutines() <= routines, check.Equals, true)
+			return
+		default:
+			if n := getGoRoutines(); n <= routines {
+				return
+			}
+			time.Sleep(200 * time.Millisecond)
+		}
+	}
+}

+ 2 - 2
integration-cli/docker_utils.go

@@ -366,8 +366,8 @@ func sockRequestRaw(method, endpoint string, data io.Reader, ct string) (*http.R
 		return nil, nil, fmt.Errorf("could not perform request: %v", err)
 		return nil, nil, fmt.Errorf("could not perform request: %v", err)
 	}
 	}
 	body := ioutils.NewReadCloserWrapper(resp.Body, func() error {
 	body := ioutils.NewReadCloserWrapper(resp.Body, func() error {
-		defer client.Close()
-		return resp.Body.Close()
+		defer resp.Body.Close()
+		return client.Close()
 	})
 	})
 
 
 	return resp, body, nil
 	return resp, body, nil