Pārlūkot izejas kodu

Merge pull request #32154 from dperny/refactor-logs

Refactor logs and support service logs with TTY
Brian Goff 8 gadi atpakaļ
vecāks
revīzija
4a1a64c677

+ 92 - 0
api/server/httputils/write_log_stream.go

@@ -0,0 +1,92 @@
+package httputils
+
+import (
+	"fmt"
+	"io"
+	"sort"
+	"strings"
+
+	"golang.org/x/net/context"
+
+	"github.com/docker/docker/api/types"
+	"github.com/docker/docker/api/types/backend"
+	"github.com/docker/docker/pkg/ioutils"
+	"github.com/docker/docker/pkg/jsonlog"
+	"github.com/docker/docker/pkg/stdcopy"
+)
+
+// WriteLogStream writes an encoded byte stream of log messages from the
+// messages channel, multiplexing them with a stdcopy.Writer if mux is true
+func WriteLogStream(ctx context.Context, w io.Writer, msgs <-chan *backend.LogMessage, config *types.ContainerLogsOptions, mux bool) {
+	wf := ioutils.NewWriteFlusher(w)
+	defer wf.Close()
+
+	wf.Flush()
+
+	// this might seem like doing below is clear:
+	//   var outStream io.Writer = wf
+	// however, this GREATLY DISPLEASES golint, and if you do that, it will
+	// fail CI. we need outstream to be type writer because if we mux streams,
+	// we will need to reassign all of the streams to be stdwriters, which only
+	// conforms to the io.Writer interface.
+	var outStream io.Writer
+	outStream = wf
+	errStream := outStream
+	sysErrStream := errStream
+	if mux {
+		sysErrStream = stdcopy.NewStdWriter(outStream, stdcopy.Systemerr)
+		errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr)
+		outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
+	}
+
+	for {
+		msg, ok := <-msgs
+		if !ok {
+			return
+		}
+		// check if the message contains an error. if so, write that error
+		// and exit
+		if msg.Err != nil {
+			fmt.Fprintf(sysErrStream, "Error grabbing logs: %v\n", msg.Err)
+			continue
+		}
+		logLine := msg.Line
+		if config.Details {
+			logLine = append([]byte(stringAttrs(msg.Attrs)+" "), logLine...)
+		}
+		if config.Timestamps {
+			// TODO(dperny) the format is defined in
+			// daemon/logger/logger.go as logger.TimeFormat. importing
+			// logger is verboten (not part of backend) so idk if just
+			// importing the same thing from jsonlog is good enough
+			logLine = append([]byte(msg.Timestamp.Format(jsonlog.RFC3339NanoFixed)+" "), logLine...)
+		}
+		if msg.Source == "stdout" && config.ShowStdout {
+			outStream.Write(logLine)
+		}
+		if msg.Source == "stderr" && config.ShowStderr {
+			errStream.Write(logLine)
+		}
+	}
+}
+
+type byKey []string
+
+func (s byKey) Len() int { return len(s) }
+func (s byKey) Less(i, j int) bool {
+	keyI := strings.Split(s[i], "=")
+	keyJ := strings.Split(s[j], "=")
+	return keyI[0] < keyJ[0]
+}
+func (s byKey) Swap(i, j int) {
+	s[i], s[j] = s[j], s[i]
+}
+
+func stringAttrs(a backend.LogAttributes) string {
+	var ss byKey
+	for k, v := range a {
+		ss = append(ss, k+"="+v)
+	}
+	sort.Sort(ss)
+	return strings.Join(ss, ",")
+}

+ 1 - 1
api/server/router/container/backend.go

@@ -51,7 +51,7 @@ type stateBackend interface {
 type monitorBackend interface {
 	ContainerChanges(name string) ([]archive.Change, error)
 	ContainerInspect(name string, size bool, version string) (interface{}, error)
-	ContainerLogs(ctx context.Context, name string, config *backend.ContainerLogsConfig, started chan struct{}) error
+	ContainerLogs(ctx context.Context, name string, config *types.ContainerLogsOptions) (<-chan *backend.LogMessage, error)
 	ContainerStats(ctx context.Context, name string, config *backend.ContainerStatsConfig) error
 	ContainerTop(name string, psArgs string) (*container.ContainerTopOKBody, error)
 

+ 29 - 24
api/server/router/container/container_routes.go

@@ -10,6 +10,7 @@ import (
 	"time"
 
 	"github.com/Sirupsen/logrus"
+	"github.com/docker/docker/api"
 	"github.com/docker/docker/api/server/httputils"
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types/backend"
@@ -18,7 +19,6 @@ import (
 	"github.com/docker/docker/api/types/versions"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/signal"
-	"github.com/docker/docker/pkg/stdcopy"
 	"golang.org/x/net/context"
 	"golang.org/x/net/websocket"
 )
@@ -91,33 +91,38 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response
 	}
 
 	containerName := vars["name"]
-	logsConfig := &backend.ContainerLogsConfig{
-		ContainerLogsOptions: types.ContainerLogsOptions{
-			Follow:     httputils.BoolValue(r, "follow"),
-			Timestamps: httputils.BoolValue(r, "timestamps"),
-			Since:      r.Form.Get("since"),
-			Tail:       r.Form.Get("tail"),
-			ShowStdout: stdout,
-			ShowStderr: stderr,
-			Details:    httputils.BoolValue(r, "details"),
-		},
-		OutStream: w,
+	logsConfig := &types.ContainerLogsOptions{
+		Follow:     httputils.BoolValue(r, "follow"),
+		Timestamps: httputils.BoolValue(r, "timestamps"),
+		Since:      r.Form.Get("since"),
+		Tail:       r.Form.Get("tail"),
+		ShowStdout: stdout,
+		ShowStderr: stderr,
+		Details:    httputils.BoolValue(r, "details"),
+	}
+
+	// doesn't matter what version the client is on, we're using this internally only
+	// also do we need size? i'm thinkin no we don't
+	raw, err := s.backend.ContainerInspect(containerName, false, api.DefaultVersion)
+	if err != nil {
+		return err
+	}
+	container, ok := raw.(*types.ContainerJSON)
+	if !ok {
+		// %T prints the type. handy!
+		return fmt.Errorf("expected container to be *types.ContainerJSON but got %T", raw)
 	}
 
-	chStarted := make(chan struct{})
-	if err := s.backend.ContainerLogs(ctx, containerName, logsConfig, chStarted); err != nil {
-		select {
-		case <-chStarted:
-			// The client may be expecting all of the data we're sending to
-			// be multiplexed, so mux it through the Systemerr stream, which
-			// will cause the client to throw an error when demuxing
-			stdwriter := stdcopy.NewStdWriter(logsConfig.OutStream, stdcopy.Systemerr)
-			fmt.Fprintf(stdwriter, "Error running logs job: %v\n", err)
-		default:
-			return err
-		}
+	msgs, err := s.backend.ContainerLogs(ctx, containerName, logsConfig)
+	if err != nil {
+		return err
 	}
 
+	// if has a tty, we're not muxing streams. if it doesn't, we are. simple.
+	// this is the point of no return for writing a response. once we call
+	// WriteLogStream, the response has been started and errors will be
+	// returned in band by WriteLogStream
+	httputils.WriteLogStream(ctx, w, msgs, logsConfig, !container.Config.Tty)
 	return nil
 }
 

+ 1 - 1
api/server/router/swarm/backend.go

@@ -21,7 +21,7 @@ type Backend interface {
 	CreateService(types.ServiceSpec, string) (*basictypes.ServiceCreateResponse, error)
 	UpdateService(string, uint64, types.ServiceSpec, basictypes.ServiceUpdateOptions) (*basictypes.ServiceUpdateResponse, error)
 	RemoveService(string) error
-	ServiceLogs(context.Context, *backend.LogSelector, *backend.ContainerLogsConfig, chan struct{}) error
+	ServiceLogs(context.Context, *backend.LogSelector, *basictypes.ContainerLogsOptions) (<-chan *backend.LogMessage, error)
 	GetNodes(basictypes.NodeListOptions) ([]types.Node, error)
 	GetNode(string) (types.Node, error)
 	UpdateNode(string, uint64, types.NodeSpec) error

+ 32 - 22
api/server/router/swarm/helpers.go

@@ -7,7 +7,6 @@ import (
 	"github.com/docker/docker/api/server/httputils"
 	basictypes "github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types/backend"
-	"github.com/docker/docker/pkg/stdcopy"
 	"golang.org/x/net/context"
 )
 
@@ -24,32 +23,43 @@ func (sr *swarmRouter) swarmLogs(ctx context.Context, w http.ResponseWriter, r *
 		return fmt.Errorf("Bad parameters: you must choose at least one stream")
 	}
 
-	logsConfig := &backend.ContainerLogsConfig{
-		ContainerLogsOptions: basictypes.ContainerLogsOptions{
-			Follow:     httputils.BoolValue(r, "follow"),
-			Timestamps: httputils.BoolValue(r, "timestamps"),
-			Since:      r.Form.Get("since"),
-			Tail:       r.Form.Get("tail"),
-			ShowStdout: stdout,
-			ShowStderr: stderr,
-			Details:    httputils.BoolValue(r, "details"),
-		},
-		OutStream: w,
+	// there is probably a neater way to manufacture the ContainerLogsOptions
+	// struct, probably in the caller, to eliminate the dependency on net/http
+	logsConfig := &basictypes.ContainerLogsOptions{
+		Follow:     httputils.BoolValue(r, "follow"),
+		Timestamps: httputils.BoolValue(r, "timestamps"),
+		Since:      r.Form.Get("since"),
+		Tail:       r.Form.Get("tail"),
+		ShowStdout: stdout,
+		ShowStderr: stderr,
+		Details:    httputils.BoolValue(r, "details"),
 	}
 
-	chStarted := make(chan struct{})
-	if err := sr.backend.ServiceLogs(ctx, selector, logsConfig, chStarted); err != nil {
-		select {
-		case <-chStarted:
-			// The client may be expecting all of the data we're sending to
-			// be multiplexed, so send it through OutStream, which will
-			// have been set up to handle that if needed.
-			stdwriter := stdcopy.NewStdWriter(w, stdcopy.Systemerr)
-			fmt.Fprintf(stdwriter, "Error grabbing service logs: %v\n", err)
-		default:
+	tty := false
+	// checking for whether logs are TTY involves iterating over every service
+	// and task. idk if there is a better way
+	for _, service := range selector.Services {
+		s, err := sr.backend.GetService(service)
+		if err != nil {
+			// maybe should return some context with this error?
 			return err
 		}
+		tty = s.Spec.TaskTemplate.ContainerSpec.TTY || tty
+	}
+	for _, task := range selector.Tasks {
+		t, err := sr.backend.GetTask(task)
+		if err != nil {
+			// as above
+			return err
+		}
+		tty = t.Spec.ContainerSpec.TTY || tty
+	}
+
+	msgs, err := sr.backend.ServiceLogs(ctx, selector, logsConfig)
+	if err != nil {
+		return err
 	}
 
+	httputils.WriteLogStream(ctx, w, msgs, logsConfig, !tty)
 	return nil
 }

+ 21 - 5
api/types/backend/backend.go

@@ -3,6 +3,7 @@ package backend
 
 import (
 	"io"
+	"time"
 
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/pkg/streamformatter"
@@ -25,13 +26,28 @@ type ContainerAttachConfig struct {
 	MuxStreams bool
 }
 
-// ContainerLogsConfig holds configs for logging operations. Exists
-// for users of the backend to to pass it a logging configuration.
-type ContainerLogsConfig struct {
-	types.ContainerLogsOptions
-	OutStream io.Writer
+// LogMessage is datastructure that represents piece of output produced by some
+// container.  The Line member is a slice of an array whose contents can be
+// changed after a log driver's Log() method returns.
+// changes to this struct need to be reflect in the reset method in
+// daemon/logger/logger.go
+type LogMessage struct {
+	Line      []byte
+	Source    string
+	Timestamp time.Time
+	Attrs     LogAttributes
+	Partial   bool
+
+	// Err is an error associated with a message. Completeness of a message
+	// with Err is not expected, tho it may be partially complete (fields may
+	// be missing, gibberish, or nil)
+	Err error
 }
 
+// LogAttributes is used to hold the extra attributes available in the log message
+// Primarily used for converting the map type to string and sorting.
+type LogAttributes map[string]string
+
 // LogSelector is a list of services and tasks that should be returned as part
 // of a log stream. It is similar to swarmapi.LogSelector, with the difference
 // that the names don't have to be resolved to IDs; this is mostly to avoid

+ 22 - 0
cli/command/service/logs.go

@@ -73,6 +73,7 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error {
 		Timestamps: opts.timestamps,
 		Follow:     opts.follow,
 		Tail:       opts.tail,
+		Details:    true,
 	}
 
 	cli := dockerCli.Client()
@@ -80,6 +81,7 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error {
 	var (
 		maxLength    = 1
 		responseBody io.ReadCloser
+		tty          bool
 	)
 
 	service, _, err := cli.ServiceInspectWithRaw(ctx, opts.target)
@@ -89,6 +91,14 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error {
 			return err
 		}
 		task, _, err := cli.TaskInspectWithRaw(ctx, opts.target)
+		tty = task.Spec.ContainerSpec.TTY
+		// TODO(dperny) hot fix until we get a nice details system squared away,
+		// ignores details (including task context) if we have a TTY log
+		if tty {
+			options.Details = false
+		}
+
+		responseBody, err = cli.TaskLogs(ctx, opts.target, options)
 		if err != nil {
 			if client.IsErrTaskNotFound(err) {
 				// if the task ALSO isn't found, rewrite the error to be clear
@@ -100,6 +110,13 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error {
 		maxLength = getMaxLength(task.Slot)
 		responseBody, err = cli.TaskLogs(ctx, opts.target, options)
 	} else {
+		tty = service.Spec.TaskTemplate.ContainerSpec.TTY
+		// TODO(dperny) hot fix until we get a nice details system squared away,
+		// ignores details (including task context) if we have a TTY log
+		if tty {
+			options.Details = false
+		}
+
 		responseBody, err = cli.ServiceLogs(ctx, opts.target, options)
 		if err != nil {
 			return err
@@ -112,6 +129,11 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error {
 	}
 	defer responseBody.Close()
 
+	if tty {
+		_, err = io.Copy(dockerCli.Out(), responseBody)
+		return err
+	}
+
 	taskFormatter := newTaskFormatter(cli, opts, maxLength)
 
 	stdout := &logWriter{ctx: ctx, opts: opts, f: taskFormatter, w: dockerCli.Out()}

+ 1 - 1
daemon/cluster/executor/backend.go

@@ -33,7 +33,7 @@ type Backend interface {
 	CreateManagedContainer(config types.ContainerCreateConfig) (container.ContainerCreateCreatedBody, error)
 	ContainerStart(name string, hostConfig *container.HostConfig, checkpoint string, checkpointDir string) error
 	ContainerStop(name string, seconds *int) error
-	ContainerLogs(context.Context, string, *backend.ContainerLogsConfig, chan struct{}) error
+	ContainerLogs(context.Context, string, *types.ContainerLogsOptions) (<-chan *backend.LogMessage, error)
 	ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error
 	ActivateContainerServiceBinding(containerName string) error
 	DeactivateContainerServiceBinding(containerName string) error

+ 14 - 28
daemon/cluster/executor/container/adapter.go

@@ -396,26 +396,15 @@ func (c *containerAdapter) deactivateServiceBinding() error {
 	return c.backend.DeactivateContainerServiceBinding(c.container.name())
 }
 
-func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (io.ReadCloser, error) {
-	// we can't handle the peculiarities of a TTY-attached container yet
-	conf := c.container.config()
-	if conf != nil && conf.Tty {
-		return nil, errors.New("logs not supported on containers with a TTY attached")
-	}
-
-	reader, writer := io.Pipe()
-
-	apiOptions := &backend.ContainerLogsConfig{
-		ContainerLogsOptions: types.ContainerLogsOptions{
-			Follow: options.Follow,
-
-			// TODO(stevvooe): Parse timestamp out of message. This
-			// absolutely needs to be done before going to production with
-			// this, at it is completely redundant.
-			Timestamps: true,
-			Details:    false, // no clue what to do with this, let's just deprecate it.
-		},
-		OutStream: writer,
+func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (<-chan *backend.LogMessage, error) {
+	apiOptions := &types.ContainerLogsOptions{
+		Follow: options.Follow,
+
+		// TODO(stevvooe): Parse timestamp out of message. This
+		// absolutely needs to be done before going to production with
+		// this, at it is completely redundant.
+		Timestamps: true,
+		Details:    false, // no clue what to do with this, let's just deprecate it.
 	}
 
 	if options.Since != nil {
@@ -449,14 +438,11 @@ func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscription
 			}
 		}
 	}
-
-	chStarted := make(chan struct{})
-	go func() {
-		defer writer.Close()
-		c.backend.ContainerLogs(ctx, c.container.name(), apiOptions, chStarted)
-	}()
-
-	return reader, nil
+	msgs, err := c.backend.ContainerLogs(ctx, c.container.name(), apiOptions)
+	if err != nil {
+		return nil, err
+	}
+	return msgs, nil
 }
 
 // todo: typed/wrapped errors

+ 22 - 40
daemon/cluster/executor/container/controller.go

@@ -1,11 +1,7 @@
 package container
 
 import (
-	"bufio"
-	"bytes"
-	"encoding/binary"
 	"fmt"
-	"io"
 	"os"
 	"strconv"
 	"strings"
@@ -445,11 +441,12 @@ func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, opti
 		return errors.Wrap(err, "container not ready for logs")
 	}
 
-	rc, err := r.adapter.logs(ctx, options)
+	logsContext, cancel := context.WithCancel(ctx)
+	msgs, err := r.adapter.logs(logsContext, options)
+	defer cancel()
 	if err != nil {
 		return errors.Wrap(err, "failed getting container logs")
 	}
-	defer rc.Close()
 
 	var (
 		// use a rate limiter to keep things under control but also provides some
@@ -462,53 +459,38 @@ func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, opti
 		}
 	)
 
-	brd := bufio.NewReader(rc)
 	for {
-		// so, message header is 8 bytes, treat as uint64, pull stream off MSB
-		var header uint64
-		if err := binary.Read(brd, binary.BigEndian, &header); err != nil {
-			if err == io.EOF {
-				return nil
-			}
-
-			return errors.Wrap(err, "failed reading log header")
-		}
-
-		stream, size := (header>>(7<<3))&0xFF, header & ^(uint64(0xFF)<<(7<<3))
-
-		// limit here to decrease allocation back pressure.
-		if err := limiter.WaitN(ctx, int(size)); err != nil {
-			return errors.Wrap(err, "failed rate limiter")
-		}
-
-		buf := make([]byte, size)
-		_, err := io.ReadFull(brd, buf)
-		if err != nil {
-			return errors.Wrap(err, "failed reading buffer")
+		msg, ok := <-msgs
+		if !ok {
+			// we're done here, no more messages
+			return nil
 		}
 
-		// Timestamp is RFC3339Nano with 1 space after. Lop, parse, publish
-		parts := bytes.SplitN(buf, []byte(" "), 2)
-		if len(parts) != 2 {
-			return fmt.Errorf("invalid timestamp in log message: %v", buf)
+		if msg.Err != nil {
+			// the defered cancel closes the adapter's log stream
+			return msg.Err
 		}
 
-		ts, err := time.Parse(time.RFC3339Nano, string(parts[0]))
-		if err != nil {
-			return errors.Wrap(err, "failed to parse timestamp")
+		// wait here for the limiter to catch up
+		if err := limiter.WaitN(ctx, len(msg.Line)); err != nil {
+			return errors.Wrap(err, "failed rate limiter")
 		}
-
-		tsp, err := gogotypes.TimestampProto(ts)
+		tsp, err := gogotypes.TimestampProto(msg.Timestamp)
 		if err != nil {
 			return errors.Wrap(err, "failed to convert timestamp")
 		}
+		var stream api.LogStream
+		if msg.Source == "stdout" {
+			stream = api.LogStreamStdout
+		} else if msg.Source == "stderr" {
+			stream = api.LogStreamStderr
+		}
 
 		if err := publisher.Publish(ctx, api.LogMessage{
 			Context:   msgctx,
 			Timestamp: tsp,
-			Stream:    api.LogStream(stream),
-
-			Data: parts[1],
+			Stream:    stream,
+			Data:      msg.Line,
 		}); err != nil {
 			return errors.Wrap(err, "failed to publish log message")
 		}

+ 74 - 99
daemon/cluster/services.go

@@ -18,9 +18,6 @@ import (
 	types "github.com/docker/docker/api/types/swarm"
 	timetypes "github.com/docker/docker/api/types/time"
 	"github.com/docker/docker/daemon/cluster/convert"
-	"github.com/docker/docker/daemon/logger"
-	"github.com/docker/docker/pkg/ioutils"
-	"github.com/docker/docker/pkg/stdcopy"
 	runconfigopts "github.com/docker/docker/runconfig/opts"
 	swarmapi "github.com/docker/swarmkit/api"
 	gogotypes "github.com/gogo/protobuf/types"
@@ -303,56 +300,44 @@ func (c *Cluster) RemoveService(input string) error {
 }
 
 // ServiceLogs collects service logs and writes them back to `config.OutStream`
-func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *backend.ContainerLogsConfig, started chan struct{}) error {
+func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *apitypes.ContainerLogsOptions) (<-chan *backend.LogMessage, error) {
 	c.mu.RLock()
-	defer func() {
-		select {
-		case <-started:
-			// if we've started streaming logs, we are no longer holding the
-			// lock and do not have to release it
-			return
-		default:
-			// before we start, though, we're holding this lock and it needs to
-			// be released
-			c.mu.RUnlock()
-		}
-	}()
+	defer c.mu.RUnlock()
+
 	state := c.currentNodeState()
 	if !state.IsActiveManager() {
-		return c.errNoManager(state)
+		return nil, c.errNoManager(state)
 	}
 
-	swarmSelector, tty, err := convertSelector(ctx, state.controlClient, selector)
+	swarmSelector, err := convertSelector(ctx, state.controlClient, selector)
 	if err != nil {
-		return errors.Wrap(err, "error making log selector")
-	}
-
-	// TODO(dperny) this goes away when we support TTY logs, which is in the works
-	if tty {
-		return errors.New("service logs not supported on tasks with a TTY attached")
+		return nil, errors.Wrap(err, "error making log selector")
 	}
 
 	// set the streams we'll use
 	stdStreams := []swarmapi.LogStream{}
-	if config.ContainerLogsOptions.ShowStdout {
+	if config.ShowStdout {
 		stdStreams = append(stdStreams, swarmapi.LogStreamStdout)
 	}
-	if config.ContainerLogsOptions.ShowStderr {
+	if config.ShowStderr {
 		stdStreams = append(stdStreams, swarmapi.LogStreamStderr)
 	}
 
 	// Get tail value squared away - the number of previous log lines we look at
 	var tail int64
+	// in ContainerLogs, if the tail value is ANYTHING non-integer, we just set
+	// it to -1 (all). i don't agree with that, but i also think no tail value
+	// should be legitimate. if you don't pass tail, we assume you want "all"
 	if config.Tail == "all" || config.Tail == "" {
 		// tail of 0 means send all logs on the swarmkit side
 		tail = 0
 	} else {
 		t, err := strconv.Atoi(config.Tail)
 		if err != nil {
-			return errors.New("tail value must be a positive integer or \"all\"")
+			return nil, errors.New("tail value must be a positive integer or \"all\"")
 		}
 		if t < 0 {
-			return errors.New("negative tail values not supported")
+			return nil, errors.New("negative tail values not supported")
 		}
 		// we actually use negative tail in swarmkit to represent messages
 		// backwards starting from the beginning. also, -1 means no logs. so,
@@ -370,12 +355,12 @@ func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector
 	if config.Since != "" {
 		s, n, err := timetypes.ParseTimestamps(config.Since, 0)
 		if err != nil {
-			return errors.Wrap(err, "could not parse since timestamp")
+			return nil, errors.Wrap(err, "could not parse since timestamp")
 		}
 		since := time.Unix(s, n)
 		sinceProto, err = gogotypes.TimestampProto(since)
 		if err != nil {
-			return errors.Wrap(err, "could not parse timestamp to proto")
+			return nil, errors.Wrap(err, "could not parse timestamp to proto")
 		}
 	}
 
@@ -389,106 +374,96 @@ func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector
 		},
 	})
 	if err != nil {
-		return err
+		return nil, err
 	}
 
-	wf := ioutils.NewWriteFlusher(config.OutStream)
-	defer wf.Close()
-
-	// Release the lock before starting the stream.
-	//
-	// this feels like it could be racy because we would double unlock if we
-	// somehow returned right after we unlocked but before we closed, but I do
-	// not think such a thing is possible. i wish it were possible to atomically
-	// close and unlock but that might be overkill. programming is hard.
-	c.mu.RUnlock()
-	close(started)
-
-	wf.Flush()
-
-	outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
-	errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
-
-	for {
-		// Check the context before doing anything.
-		select {
-		case <-ctx.Done():
-			return ctx.Err()
-		default:
-		}
-
-		subscribeMsg, err := stream.Recv()
-		if err == io.EOF {
-			return nil
-		}
-		if err != nil {
-			return err
-		}
-
-		for _, msg := range subscribeMsg.Messages {
-			data := []byte{}
-
-			if config.Timestamps {
-				ts, err := gogotypes.TimestampFromProto(msg.Timestamp)
-				if err != nil {
-					return err
+	messageChan := make(chan *backend.LogMessage, 1)
+	go func() {
+		defer close(messageChan)
+		for {
+			// Check the context before doing anything.
+			select {
+			case <-ctx.Done():
+				return
+			default:
+			}
+			subscribeMsg, err := stream.Recv()
+			if err == io.EOF {
+				return
+			}
+			// if we're not io.EOF, push the message in and return
+			if err != nil {
+				select {
+				case <-ctx.Done():
+				case messageChan <- &backend.LogMessage{Err: err}:
 				}
-				data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...)
+				return
 			}
 
-			data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ",
-				contextPrefix, msg.Context.NodeID,
-				contextPrefix, msg.Context.ServiceID,
-				contextPrefix, msg.Context.TaskID,
-			))...)
-
-			data = append(data, msg.Data...)
-
-			switch msg.Stream {
-			case swarmapi.LogStreamStdout:
-				outStream.Write(data)
-			case swarmapi.LogStreamStderr:
-				errStream.Write(data)
+			for _, msg := range subscribeMsg.Messages {
+				// make a new message
+				m := new(backend.LogMessage)
+				m.Attrs = make(backend.LogAttributes)
+				// add the timestamp, adding the error if it fails
+				m.Timestamp, err = gogotypes.TimestampFromProto(msg.Timestamp)
+				if err != nil {
+					m.Err = err
+				}
+				m.Attrs[contextPrefix+".node.id"] = msg.Context.NodeID
+				m.Attrs[contextPrefix+".service.id"] = msg.Context.ServiceID
+				m.Attrs[contextPrefix+".task.id"] = msg.Context.TaskID
+				switch msg.Stream {
+				case swarmapi.LogStreamStdout:
+					m.Source = "stdout"
+				case swarmapi.LogStreamStderr:
+					m.Source = "stderr"
+				}
+				m.Line = msg.Data
+
+				// there could be a case where the reader stops accepting
+				// messages and the context is canceled. we need to check that
+				// here, or otherwise we risk blocking forever on the message
+				// send.
+				select {
+				case <-ctx.Done():
+					return
+				case messageChan <- m:
+				}
 			}
 		}
-	}
+	}()
+	return messageChan, nil
 }
 
 // convertSelector takes a backend.LogSelector, which contains raw names that
 // may or may not be valid, and converts them to an api.LogSelector proto. It
-// also returns a boolean, true if any of the services use a TTY (false
-// otherwise) and an error if something fails
-func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, bool, error) {
-	// if ANY tasks use a TTY, don't mux streams
-	var tty bool
+// returns an error if something fails
+func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, error) {
 	// don't rely on swarmkit to resolve IDs, do it ourselves
 	swarmSelector := &swarmapi.LogSelector{}
 	for _, s := range selector.Services {
 		service, err := getService(ctx, cc, s)
 		if err != nil {
-			return nil, false, err
+			return nil, err
 		}
 		c := service.Spec.Task.GetContainer()
 		if c == nil {
-			return nil, false, errors.New("logs only supported on container tasks")
+			return nil, errors.New("logs only supported on container tasks")
 		}
-		// set TTY true if we have a TTY service, or if it's already true
-		tty = tty || c.TTY
 		swarmSelector.ServiceIDs = append(swarmSelector.ServiceIDs, service.ID)
 	}
 	for _, t := range selector.Tasks {
 		task, err := getTask(ctx, cc, t)
 		if err != nil {
-			return nil, false, err
+			return nil, err
 		}
 		c := task.Spec.GetContainer()
 		if c == nil {
-			return nil, false, errors.New("logs only supported on container tasks")
+			return nil, errors.New("logs only supported on container tasks")
 		}
-		tty = tty || c.TTY
 		swarmSelector.TaskIDs = append(swarmSelector.TaskIDs, task.ID)
 	}
-	return swarmSelector, tty, nil
+	return swarmSelector, nil
 }
 
 // imageWithDigestString takes an image such as name or name:tag

+ 17 - 30
daemon/logger/logger.go

@@ -9,11 +9,10 @@ package logger
 
 import (
 	"errors"
-	"sort"
-	"strings"
 	"sync"
 	"time"
 
+	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/pkg/jsonlog"
 )
 
@@ -43,14 +42,13 @@ func PutMessage(msg *Message) {
 // Message is datastructure that represents piece of output produced by some
 // container.  The Line member is a slice of an array whose contents can be
 // changed after a log driver's Log() method returns.
+//
+// Message is subtyped from backend.LogMessage because there is a lot of
+// internal complexity around the Message type that should not be exposed
+// to any package not explicitly importing the logger type.
+//
 // Any changes made to this struct must also be updated in the `reset` function
-type Message struct {
-	Line      []byte
-	Source    string
-	Timestamp time.Time
-	Attrs     LogAttributes
-	Partial   bool
-}
+type Message backend.LogMessage
 
 // reset sets the message back to default values
 // This is used when putting a message back into the message pool.
@@ -60,32 +58,21 @@ func (m *Message) reset() {
 	m.Source = ""
 	m.Attrs = nil
 	m.Partial = false
-}
 
-// LogAttributes is used to hold the extra attributes available in the log message
-// Primarily used for converting the map type to string and sorting.
-type LogAttributes map[string]string
-type byKey []string
-
-func (s byKey) Len() int { return len(s) }
-func (s byKey) Less(i, j int) bool {
-	keyI := strings.Split(s[i], "=")
-	keyJ := strings.Split(s[j], "=")
-	return keyI[0] < keyJ[0]
-}
-func (s byKey) Swap(i, j int) {
-	s[i], s[j] = s[j], s[i]
+	m.Err = nil
 }
 
-func (a LogAttributes) String() string {
-	var ss byKey
-	for k, v := range a {
-		ss = append(ss, k+"="+v)
-	}
-	sort.Sort(ss)
-	return strings.Join(ss, ",")
+// AsLogMessage returns a pointer to the message as a pointer to
+// backend.LogMessage, which is an identical type with a different purpose
+func (m *Message) AsLogMessage() *backend.LogMessage {
+	return (*backend.LogMessage)(m)
 }
 
+// LogAttributes is used to hold the extra attributes available in the log message
+// Primarily used for converting the map type to string and sorting.
+// Imported here so it can be used internally with less refactoring
+type LogAttributes backend.LogAttributes
+
 // Logger is the interface for docker logging drivers.
 type Logger interface {
 	Log(*Message) error

+ 89 - 65
daemon/logs.go

@@ -2,48 +2,57 @@ package daemon
 
 import (
 	"errors"
-	"io"
 	"strconv"
 	"time"
 
 	"golang.org/x/net/context"
 
 	"github.com/Sirupsen/logrus"
+	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types/backend"
 	containertypes "github.com/docker/docker/api/types/container"
 	timetypes "github.com/docker/docker/api/types/time"
 	"github.com/docker/docker/container"
 	"github.com/docker/docker/daemon/logger"
-	"github.com/docker/docker/pkg/ioutils"
-	"github.com/docker/docker/pkg/stdcopy"
 )
 
-// ContainerLogs hooks up a container's stdout and stderr streams
-// configured with the given struct.
-func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *backend.ContainerLogsConfig, started chan struct{}) error {
+// ContainerLogs copies the container's log channel to the channel provided in
+// the config. If ContainerLogs returns an error, no messages have been copied.
+// and the channel will be closed without data.
+//
+// if it returns nil, the config channel will be active and return log
+// messages until it runs out or the context is canceled.
+func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *types.ContainerLogsOptions) (<-chan *backend.LogMessage, error) {
+	lg := logrus.WithFields(logrus.Fields{
+		"module":    "daemon",
+		"method":    "(*Daemon).ContainerLogs",
+		"container": containerName,
+	})
+
 	if !(config.ShowStdout || config.ShowStderr) {
-		return errors.New("You must choose at least one stream")
+		return nil, errors.New("You must choose at least one stream")
 	}
 	container, err := daemon.GetContainer(containerName)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	if container.RemovalInProgress || container.Dead {
-		return errors.New("can not get logs from container which is dead or marked for removal")
+		return nil, errors.New("can not get logs from container which is dead or marked for removal")
 	}
 
 	if container.HostConfig.LogConfig.Type == "none" {
-		return logger.ErrReadLogsNotSupported
+		return nil, logger.ErrReadLogsNotSupported
 	}
 
 	cLog, err := daemon.getLogger(container)
 	if err != nil {
-		return err
+		return nil, err
 	}
+
 	logReader, ok := cLog.(logger.LogReader)
 	if !ok {
-		return logger.ErrReadLogsNotSupported
+		return nil, logger.ErrReadLogsNotSupported
 	}
 
 	follow := config.Follow && container.IsRunning()
@@ -52,76 +61,91 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c
 		tailLines = -1
 	}
 
-	logrus.Debug("logs: begin stream")
-
 	var since time.Time
 	if config.Since != "" {
 		s, n, err := timetypes.ParseTimestamps(config.Since, 0)
 		if err != nil {
-			return err
+			return nil, err
 		}
 		since = time.Unix(s, n)
 	}
+
 	readConfig := logger.ReadConfig{
 		Since:  since,
 		Tail:   tailLines,
 		Follow: follow,
 	}
-	logs := logReader.ReadLogs(readConfig)
-	// Close logWatcher on exit
-	defer func() {
-		logs.Close()
-		if cLog != container.LogDriver {
-			// Since the logger isn't cached in the container, which
-			// occurs if it is running, it must get explicitly closed
-			// here to avoid leaking it and any file handles it has.
-			if err := cLog.Close(); err != nil {
-				logrus.Errorf("Error closing logger: %v", err)
-			}
-		}
-	}()
 
-	wf := ioutils.NewWriteFlusher(config.OutStream)
-	defer wf.Close()
-	close(started)
-	wf.Flush()
-
-	var outStream io.Writer
-	outStream = wf
-	errStream := outStream
-	if !container.Config.Tty {
-		errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr)
-		outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
-	}
+	logs := logReader.ReadLogs(readConfig)
 
-	for {
-		select {
-		case err := <-logs.Err:
-			logrus.Errorf("Error streaming logs: %v", err)
-			return nil
-		case <-ctx.Done():
-			logrus.Debugf("logs: end stream, ctx is done: %v", ctx.Err())
-			return nil
-		case msg, ok := <-logs.Msg:
-			if !ok {
-				logrus.Debug("logs: end stream")
-				return nil
-			}
-			logLine := msg.Line
-			if config.Details {
-				logLine = append([]byte(msg.Attrs.String()+" "), logLine...)
-			}
-			if config.Timestamps {
-				logLine = append([]byte(msg.Timestamp.Format(logger.TimeFormat)+" "), logLine...)
+	// past this point, we can't possibly return any errors, so we can just
+	// start a goroutine and return to tell the caller not to expect errors
+	// (if the caller wants to give up on logs, they have to cancel the context)
+	// this goroutine functions as a shim between the logger and the caller.
+	messageChan := make(chan *backend.LogMessage, 1)
+	go func() {
+		// set up some defers
+		defer func() {
+			// ok so this function, originally, was placed right after that
+			// logger.ReadLogs call above. I THINK that means it sets off the
+			// chain of events that results in the logger needing to be closed.
+			// i do not know if an error in time parsing above causing an early
+			// return will result in leaking the logger. if that is the case,
+			// it would also have been a bug in the original code
+			logs.Close()
+			if cLog != container.LogDriver {
+				// Since the logger isn't cached in the container, which
+				// occurs if it is running, it must get explicitly closed
+				// here to avoid leaking it and any file handles it has.
+				if err := cLog.Close(); err != nil {
+					logrus.Errorf("Error closing logger: %v", err)
+				}
 			}
-			if msg.Source == "stdout" && config.ShowStdout {
-				outStream.Write(logLine)
-			}
-			if msg.Source == "stderr" && config.ShowStderr {
-				errStream.Write(logLine)
+		}()
+		// close the messages channel. closing is the only way to signal above
+		// that we're doing with logs (other than context cancel i guess).
+		defer close(messageChan)
+
+		lg.Debug("begin logs")
+		for {
+			select {
+			// i do not believe as the system is currently designed any error
+			// is possible, but we should be prepared to handle it anyway. if
+			// we do get an error, copy only the error field to a new object so
+			// we don't end up with partial data in the other fields
+			case err := <-logs.Err:
+				lg.Errorf("Error streaming logs: %v", err)
+				select {
+				case <-ctx.Done():
+				case messageChan <- &backend.LogMessage{Err: err}:
+				}
+				return
+			case <-ctx.Done():
+				lg.Debug("logs: end stream, ctx is done: %v", ctx.Err())
+				return
+			case msg, ok := <-logs.Msg:
+				// there is some kind of pool or ring buffer in the logger that
+				// produces these messages, and a possible future optimization
+				// might be to use that pool and reuse message objects
+				if !ok {
+					lg.Debug("end logs")
+					return
+				}
+				m := msg.AsLogMessage() // just a pointer conversion, does not copy data
+
+				// there could be a case where the reader stops accepting
+				// messages and the context is canceled. we need to check that
+				// here, or otherwise we risk blocking forever on the message
+				// send.
+				select {
+				case <-ctx.Done():
+					return
+				case messageChan <- m:
+				}
 			}
 		}
-	}
+	}()
+	return messageChan, nil
 }
 
 func (daemon *Daemon) getLogger(container *container.Container) (logger.Logger, error) {

+ 40 - 0
integration-cli/docker_cli_service_logs_experimental_test.go

@@ -254,3 +254,43 @@ func (s *DockerSwarmSuite) TestServiceLogsTaskLogs(c *check.C) {
 		}
 	}
 }
+
+func (s *DockerSwarmSuite) TestServiceLogsTTY(c *check.C) {
+	testRequires(c, ExperimentalDaemon)
+
+	d := s.AddDaemon(c, true, true)
+
+	name := "TestServiceLogsTTY"
+
+	result := icmd.RunCmd(d.Command(
+		// create a service
+		"service", "create",
+		// name it $name
+		"--name", name,
+		// use a TTY
+		"-t",
+		// busybox image, shell string
+		"busybox", "sh", "-c",
+		// echo to stdout and stderr
+		"echo out; (echo err 1>&2); sleep 10000",
+	))
+
+	result.Assert(c, icmd.Expected{})
+	id := strings.TrimSpace(result.Stdout())
+	c.Assert(id, checker.Not(checker.Equals), "")
+	// so, right here, we're basically inspecting by id and returning only
+	// the ID. if they don't match, the service doesn't exist.
+	result = icmd.RunCmd(d.Command("service", "inspect", "--format=\"{{.ID}}\"", id))
+	result.Assert(c, icmd.Expected{Out: id})
+
+	// make sure task has been deployed.
+	waitAndAssert(c, defaultReconciliationTimeout, d.CheckActiveContainerCount, checker.Equals, 1)
+	// and make sure we have all the log lines
+	waitAndAssert(c, defaultReconciliationTimeout, countLogLines(d, name), checker.Equals, 2)
+
+	cmd := d.Command("service", "logs", name)
+	result = icmd.RunCmd(cmd)
+	// for some reason there is carriage return in the output. i think this is
+	// just expected.
+	c.Assert(result, icmd.Matches, icmd.Expected{Out: "out\r\nerr\r\n"})
+}