Kaynağa Gözat

Move stream flushes to backend

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
Brian Goff 9 yıl önce
ebeveyn
işleme
ae4ee974e8

+ 1 - 2
api/server/router/container/backend.go

@@ -44,14 +44,13 @@ type stateBackend interface {
 	ContainerUnpause(name string) error
 	ContainerUpdate(name string, hostConfig *container.HostConfig) ([]string, error)
 	ContainerWait(name string, timeout time.Duration) (int, error)
-	Exists(id string) bool
 }
 
 // monitorBackend includes functions to implement to provide containers monitoring functionality.
 type monitorBackend interface {
 	ContainerChanges(name string) ([]archive.Change, error)
 	ContainerInspect(name string, size bool, version version.Version) (interface{}, error)
-	ContainerLogs(name string, config *backend.ContainerLogsConfig) error
+	ContainerLogs(name string, config *backend.ContainerLogsConfig, started chan struct{}) error
 	ContainerStats(name string, config *backend.ContainerStatsConfig) error
 	ContainerTop(name string, psArgs string) (*types.ContainerProcessList, error)
 

+ 13 - 35
api/server/router/container/container_routes.go

@@ -3,7 +3,6 @@ package container
 import (
 	"encoding/json"
 	"fmt"
-	"io"
 	"net/http"
 	"strconv"
 	"strings"
@@ -15,7 +14,6 @@ import (
 	"github.com/docker/docker/api/server/httputils"
 	"github.com/docker/docker/api/types/backend"
 	derr "github.com/docker/docker/errors"
-	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/signal"
 	"github.com/docker/docker/pkg/term"
 	"github.com/docker/docker/runconfig"
@@ -66,14 +64,8 @@ func (s *containerRouter) getContainersStats(ctx context.Context, w http.Respons
 	}
 
 	stream := httputils.BoolValueOrDefault(r, "stream", true)
-	var out io.Writer
 	if !stream {
 		w.Header().Set("Content-Type", "application/json")
-		out = w
-	} else {
-		wf := ioutils.NewWriteFlusher(w)
-		out = wf
-		defer wf.Close()
 	}
 
 	var closeNotifier <-chan bool
@@ -83,7 +75,7 @@ func (s *containerRouter) getContainersStats(ctx context.Context, w http.Respons
 
 	config := &backend.ContainerStatsConfig{
 		Stream:    stream,
-		OutStream: out,
+		OutStream: w,
 		Stop:      closeNotifier,
 		Version:   string(httputils.VersionFromContext(ctx)),
 	}
@@ -112,22 +104,6 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response
 	}
 
 	containerName := vars["name"]
-
-	if !s.backend.Exists(containerName) {
-		return derr.ErrorCodeNoSuchContainer.WithArgs(containerName)
-	}
-
-	// write an empty chunk of data (this is to ensure that the
-	// HTTP Response is sent immediately, even if the container has
-	// not yet produced any data)
-	w.WriteHeader(http.StatusOK)
-	if flusher, ok := w.(http.Flusher); ok {
-		flusher.Flush()
-	}
-
-	output := ioutils.NewWriteFlusher(w)
-	defer output.Close()
-
 	logsConfig := &backend.ContainerLogsConfig{
 		ContainerLogsOptions: types.ContainerLogsOptions{
 			Follow:     httputils.BoolValue(r, "follow"),
@@ -137,15 +113,21 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response
 			ShowStdout: stdout,
 			ShowStderr: stderr,
 		},
-		OutStream: output,
+		OutStream: w,
 		Stop:      closeNotifier,
 	}
 
-	if err := s.backend.ContainerLogs(containerName, logsConfig); err != nil {
-		// The client may be expecting all of the data we're sending to
-		// be multiplexed, so send it through OutStream, which will
-		// have been set up to handle that if needed.
-		fmt.Fprintf(logsConfig.OutStream, "Error running logs job: %s\n", utils.GetErrorMessage(err))
+	chStarted := make(chan struct{})
+	if err := s.backend.ContainerLogs(containerName, logsConfig, chStarted); err != nil {
+		select {
+		case <-chStarted:
+			// The client may be expecting all of the data we're sending to
+			// be multiplexed, so send it through OutStream, which will
+			// have been set up to handle that if needed.
+			fmt.Fprintf(logsConfig.OutStream, "Error running logs job: %s\n", utils.GetErrorMessage(err))
+		default:
+			return err
+		}
 	}
 
 	return nil
@@ -463,10 +445,6 @@ func (s *containerRouter) wsContainersAttach(ctx context.Context, w http.Respons
 	}
 	containerName := vars["name"]
 
-	if !s.backend.Exists(containerName) {
-		return derr.ErrorCodeNoSuchContainer.WithArgs(containerName)
-	}
-
 	var keys []byte
 	var err error
 	detachKeys := r.FormValue("detachKeys")

+ 1 - 8
api/server/router/system/system_routes.go

@@ -68,16 +68,9 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
 	}
 
 	w.Header().Set("Content-Type", "application/json")
-
-	// This is to ensure that the HTTP status code is sent immediately,
-	// so that it will not block the receiver.
-	w.WriteHeader(http.StatusOK)
-	if flusher, ok := w.(http.Flusher); ok {
-		flusher.Flush()
-	}
-
 	output := ioutils.NewWriteFlusher(w)
 	defer output.Close()
+	output.Flush()
 
 	enc := json.NewEncoder(output)
 

+ 14 - 9
daemon/logs.go

@@ -11,13 +11,14 @@ import (
 	"github.com/docker/docker/daemon/logger"
 	"github.com/docker/docker/daemon/logger/jsonfilelog"
 	derr "github.com/docker/docker/errors"
+	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/stdcopy"
 	timetypes "github.com/docker/engine-api/types/time"
 )
 
 // ContainerLogs hooks up a container's stdout and stderr streams
 // configured with the given struct.
-func (daemon *Daemon) ContainerLogs(containerName string, config *backend.ContainerLogsConfig) error {
+func (daemon *Daemon) ContainerLogs(containerName string, config *backend.ContainerLogsConfig, started chan struct{}) error {
 	container, err := daemon.GetContainer(containerName)
 	if err != nil {
 		return derr.ErrorCodeNoSuchContainer.WithArgs(containerName)
@@ -27,14 +28,6 @@ func (daemon *Daemon) ContainerLogs(containerName string, config *backend.Contai
 		return derr.ErrorCodeNeedStream
 	}
 
-	outStream := config.OutStream
-	errStream := outStream
-	if !container.Config.Tty {
-		errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr)
-		outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
-	}
-	config.OutStream = outStream
-
 	cLog, err := daemon.getLogger(container)
 	if err != nil {
 		return err
@@ -67,6 +60,18 @@ func (daemon *Daemon) ContainerLogs(containerName string, config *backend.Contai
 	}
 	logs := logReader.ReadLogs(readConfig)
 
+	wf := ioutils.NewWriteFlusher(config.OutStream)
+	defer wf.Close()
+	close(started)
+	wf.Flush()
+
+	var outStream io.Writer = wf
+	errStream := outStream
+	if !container.Config.Tty {
+		errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr)
+		outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
+	}
+
 	for {
 		select {
 		case err := <-logs.Err:

+ 7 - 5
daemon/stats.go

@@ -7,6 +7,7 @@ import (
 
 	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/daemon/execdriver"
+	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/version"
 	"github.com/docker/engine-api/types"
 	"github.com/docker/engine-api/types/versions/v1p20"
@@ -31,11 +32,12 @@ func (daemon *Daemon) ContainerStats(prefixOrName string, config *backend.Contai
 		return json.NewEncoder(config.OutStream).Encode(&types.Stats{})
 	}
 
+	outStream := config.OutStream
 	if config.Stream {
-		// Write an empty chunk of data.
-		// This is to ensure that the HTTP status code is sent immediately,
-		// even if the container has not yet produced any data.
-		config.OutStream.Write(nil)
+		wf := ioutils.NewWriteFlusher(outStream)
+		defer wf.Close()
+		wf.Flush()
+		outStream = wf
 	}
 
 	var preCPUStats types.CPUStats
@@ -50,7 +52,7 @@ func (daemon *Daemon) ContainerStats(prefixOrName string, config *backend.Contai
 		return ss
 	}
 
-	enc := json.NewEncoder(config.OutStream)
+	enc := json.NewEncoder(outStream)
 
 	updates := daemon.subscribeToContainerStats(container)
 	defer daemon.unsubscribeToContainerStats(container, updates)