diff --git a/api/server/httputils/write_log_stream.go b/api/server/httputils/write_log_stream.go new file mode 100644 index 0000000000..5793a99ff4 --- /dev/null +++ b/api/server/httputils/write_log_stream.go @@ -0,0 +1,92 @@ +package httputils + +import ( + "fmt" + "io" + "sort" + "strings" + + "golang.org/x/net/context" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/backend" + "github.com/docker/docker/pkg/ioutils" + "github.com/docker/docker/pkg/jsonlog" + "github.com/docker/docker/pkg/stdcopy" +) + +// WriteLogStream writes an encoded byte stream of log messages from the +// messages channel, multiplexing them with a stdcopy.Writer if mux is true +func WriteLogStream(ctx context.Context, w io.Writer, msgs <-chan *backend.LogMessage, config *types.ContainerLogsOptions, mux bool) { + wf := ioutils.NewWriteFlusher(w) + defer wf.Close() + + wf.Flush() + + // this might seem like doing below is clear: + // var outStream io.Writer = wf + // however, this GREATLY DISPLEASES golint, and if you do that, it will + // fail CI. we need outstream to be type writer because if we mux streams, + // we will need to reassign all of the streams to be stdwriters, which only + // conforms to the io.Writer interface. + var outStream io.Writer + outStream = wf + errStream := outStream + sysErrStream := errStream + if mux { + sysErrStream = stdcopy.NewStdWriter(outStream, stdcopy.Systemerr) + errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr) + outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout) + } + + for { + msg, ok := <-msgs + if !ok { + return + } + // check if the message contains an error. if so, write that error + // and exit + if msg.Err != nil { + fmt.Fprintf(sysErrStream, "Error grabbing logs: %v\n", msg.Err) + continue + } + logLine := msg.Line + if config.Details { + logLine = append([]byte(stringAttrs(msg.Attrs)+" "), logLine...) + } + if config.Timestamps { + // TODO(dperny) the format is defined in + // daemon/logger/logger.go as logger.TimeFormat. importing + // logger is verboten (not part of backend) so idk if just + // importing the same thing from jsonlog is good enough + logLine = append([]byte(msg.Timestamp.Format(jsonlog.RFC3339NanoFixed)+" "), logLine...) + } + if msg.Source == "stdout" && config.ShowStdout { + outStream.Write(logLine) + } + if msg.Source == "stderr" && config.ShowStderr { + errStream.Write(logLine) + } + } +} + +type byKey []string + +func (s byKey) Len() int { return len(s) } +func (s byKey) Less(i, j int) bool { + keyI := strings.Split(s[i], "=") + keyJ := strings.Split(s[j], "=") + return keyI[0] < keyJ[0] +} +func (s byKey) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func stringAttrs(a backend.LogAttributes) string { + var ss byKey + for k, v := range a { + ss = append(ss, k+"="+v) + } + sort.Sort(ss) + return strings.Join(ss, ",") +} diff --git a/api/server/router/container/backend.go b/api/server/router/container/backend.go index 6f729bea16..ce0ee8c9da 100644 --- a/api/server/router/container/backend.go +++ b/api/server/router/container/backend.go @@ -51,7 +51,7 @@ type stateBackend interface { type monitorBackend interface { ContainerChanges(name string) ([]archive.Change, error) ContainerInspect(name string, size bool, version string) (interface{}, error) - ContainerLogs(ctx context.Context, name string, config *backend.ContainerLogsConfig, started chan struct{}) error + ContainerLogs(ctx context.Context, name string, config *types.ContainerLogsOptions) (<-chan *backend.LogMessage, error) ContainerStats(ctx context.Context, name string, config *backend.ContainerStatsConfig) error ContainerTop(name string, psArgs string) (*container.ContainerTopOKBody, error) diff --git a/api/server/router/container/container_routes.go b/api/server/router/container/container_routes.go index 55abf72dcb..bd151ab276 100644 --- a/api/server/router/container/container_routes.go +++ b/api/server/router/container/container_routes.go @@ -10,6 +10,7 @@ import ( "time" "github.com/Sirupsen/logrus" + "github.com/docker/docker/api" "github.com/docker/docker/api/server/httputils" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/backend" @@ -18,7 +19,6 @@ import ( "github.com/docker/docker/api/types/versions" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/signal" - "github.com/docker/docker/pkg/stdcopy" "golang.org/x/net/context" "golang.org/x/net/websocket" ) @@ -91,33 +91,38 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response } containerName := vars["name"] - logsConfig := &backend.ContainerLogsConfig{ - ContainerLogsOptions: types.ContainerLogsOptions{ - Follow: httputils.BoolValue(r, "follow"), - Timestamps: httputils.BoolValue(r, "timestamps"), - Since: r.Form.Get("since"), - Tail: r.Form.Get("tail"), - ShowStdout: stdout, - ShowStderr: stderr, - Details: httputils.BoolValue(r, "details"), - }, - OutStream: w, + logsConfig := &types.ContainerLogsOptions{ + Follow: httputils.BoolValue(r, "follow"), + Timestamps: httputils.BoolValue(r, "timestamps"), + Since: r.Form.Get("since"), + Tail: r.Form.Get("tail"), + ShowStdout: stdout, + ShowStderr: stderr, + Details: httputils.BoolValue(r, "details"), } - chStarted := make(chan struct{}) - if err := s.backend.ContainerLogs(ctx, containerName, logsConfig, chStarted); err != nil { - select { - case <-chStarted: - // The client may be expecting all of the data we're sending to - // be multiplexed, so mux it through the Systemerr stream, which - // will cause the client to throw an error when demuxing - stdwriter := stdcopy.NewStdWriter(logsConfig.OutStream, stdcopy.Systemerr) - fmt.Fprintf(stdwriter, "Error running logs job: %v\n", err) - default: - return err - } + // doesn't matter what version the client is on, we're using this internally only + // also do we need size? i'm thinkin no we don't + raw, err := s.backend.ContainerInspect(containerName, false, api.DefaultVersion) + if err != nil { + return err + } + container, ok := raw.(*types.ContainerJSON) + if !ok { + // %T prints the type. handy! + return fmt.Errorf("expected container to be *types.ContainerJSON but got %T", raw) } + msgs, err := s.backend.ContainerLogs(ctx, containerName, logsConfig) + if err != nil { + return err + } + + // if has a tty, we're not muxing streams. if it doesn't, we are. simple. + // this is the point of no return for writing a response. once we call + // WriteLogStream, the response has been started and errors will be + // returned in band by WriteLogStream + httputils.WriteLogStream(ctx, w, msgs, logsConfig, !container.Config.Tty) return nil } diff --git a/api/server/router/swarm/backend.go b/api/server/router/swarm/backend.go index 798913b921..28b9a98018 100644 --- a/api/server/router/swarm/backend.go +++ b/api/server/router/swarm/backend.go @@ -21,7 +21,7 @@ type Backend interface { CreateService(types.ServiceSpec, string) (*basictypes.ServiceCreateResponse, error) UpdateService(string, uint64, types.ServiceSpec, basictypes.ServiceUpdateOptions) (*basictypes.ServiceUpdateResponse, error) RemoveService(string) error - ServiceLogs(context.Context, *backend.LogSelector, *backend.ContainerLogsConfig, chan struct{}) error + ServiceLogs(context.Context, *backend.LogSelector, *basictypes.ContainerLogsOptions) (<-chan *backend.LogMessage, error) GetNodes(basictypes.NodeListOptions) ([]types.Node, error) GetNode(string) (types.Node, error) UpdateNode(string, uint64, types.NodeSpec) error diff --git a/api/server/router/swarm/helpers.go b/api/server/router/swarm/helpers.go index e15eead015..af745b84c3 100644 --- a/api/server/router/swarm/helpers.go +++ b/api/server/router/swarm/helpers.go @@ -7,7 +7,6 @@ import ( "github.com/docker/docker/api/server/httputils" basictypes "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/backend" - "github.com/docker/docker/pkg/stdcopy" "golang.org/x/net/context" ) @@ -24,32 +23,43 @@ func (sr *swarmRouter) swarmLogs(ctx context.Context, w http.ResponseWriter, r * return fmt.Errorf("Bad parameters: you must choose at least one stream") } - logsConfig := &backend.ContainerLogsConfig{ - ContainerLogsOptions: basictypes.ContainerLogsOptions{ - Follow: httputils.BoolValue(r, "follow"), - Timestamps: httputils.BoolValue(r, "timestamps"), - Since: r.Form.Get("since"), - Tail: r.Form.Get("tail"), - ShowStdout: stdout, - ShowStderr: stderr, - Details: httputils.BoolValue(r, "details"), - }, - OutStream: w, + // there is probably a neater way to manufacture the ContainerLogsOptions + // struct, probably in the caller, to eliminate the dependency on net/http + logsConfig := &basictypes.ContainerLogsOptions{ + Follow: httputils.BoolValue(r, "follow"), + Timestamps: httputils.BoolValue(r, "timestamps"), + Since: r.Form.Get("since"), + Tail: r.Form.Get("tail"), + ShowStdout: stdout, + ShowStderr: stderr, + Details: httputils.BoolValue(r, "details"), } - chStarted := make(chan struct{}) - if err := sr.backend.ServiceLogs(ctx, selector, logsConfig, chStarted); err != nil { - select { - case <-chStarted: - // The client may be expecting all of the data we're sending to - // be multiplexed, so send it through OutStream, which will - // have been set up to handle that if needed. - stdwriter := stdcopy.NewStdWriter(w, stdcopy.Systemerr) - fmt.Fprintf(stdwriter, "Error grabbing service logs: %v\n", err) - default: + tty := false + // checking for whether logs are TTY involves iterating over every service + // and task. idk if there is a better way + for _, service := range selector.Services { + s, err := sr.backend.GetService(service) + if err != nil { + // maybe should return some context with this error? return err } + tty = s.Spec.TaskTemplate.ContainerSpec.TTY || tty + } + for _, task := range selector.Tasks { + t, err := sr.backend.GetTask(task) + if err != nil { + // as above + return err + } + tty = t.Spec.ContainerSpec.TTY || tty } + msgs, err := sr.backend.ServiceLogs(ctx, selector, logsConfig) + if err != nil { + return err + } + + httputils.WriteLogStream(ctx, w, msgs, logsConfig, !tty) return nil } diff --git a/api/types/backend/backend.go b/api/types/backend/backend.go index 4632c0cd9d..83efae300b 100644 --- a/api/types/backend/backend.go +++ b/api/types/backend/backend.go @@ -3,6 +3,7 @@ package backend import ( "io" + "time" "github.com/docker/docker/api/types" "github.com/docker/docker/pkg/streamformatter" @@ -25,13 +26,28 @@ type ContainerAttachConfig struct { MuxStreams bool } -// ContainerLogsConfig holds configs for logging operations. Exists -// for users of the backend to to pass it a logging configuration. -type ContainerLogsConfig struct { - types.ContainerLogsOptions - OutStream io.Writer +// LogMessage is datastructure that represents piece of output produced by some +// container. The Line member is a slice of an array whose contents can be +// changed after a log driver's Log() method returns. +// changes to this struct need to be reflect in the reset method in +// daemon/logger/logger.go +type LogMessage struct { + Line []byte + Source string + Timestamp time.Time + Attrs LogAttributes + Partial bool + + // Err is an error associated with a message. Completeness of a message + // with Err is not expected, tho it may be partially complete (fields may + // be missing, gibberish, or nil) + Err error } +// LogAttributes is used to hold the extra attributes available in the log message +// Primarily used for converting the map type to string and sorting. +type LogAttributes map[string]string + // LogSelector is a list of services and tasks that should be returned as part // of a log stream. It is similar to swarmapi.LogSelector, with the difference // that the names don't have to be resolved to IDs; this is mostly to avoid diff --git a/cli/command/service/logs.go b/cli/command/service/logs.go index da2374f9dd..cfcb7ed105 100644 --- a/cli/command/service/logs.go +++ b/cli/command/service/logs.go @@ -73,6 +73,7 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error { Timestamps: opts.timestamps, Follow: opts.follow, Tail: opts.tail, + Details: true, } cli := dockerCli.Client() @@ -80,6 +81,7 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error { var ( maxLength = 1 responseBody io.ReadCloser + tty bool ) service, _, err := cli.ServiceInspectWithRaw(ctx, opts.target) @@ -89,6 +91,14 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error { return err } task, _, err := cli.TaskInspectWithRaw(ctx, opts.target) + tty = task.Spec.ContainerSpec.TTY + // TODO(dperny) hot fix until we get a nice details system squared away, + // ignores details (including task context) if we have a TTY log + if tty { + options.Details = false + } + + responseBody, err = cli.TaskLogs(ctx, opts.target, options) if err != nil { if client.IsErrTaskNotFound(err) { // if the task ALSO isn't found, rewrite the error to be clear @@ -100,6 +110,13 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error { maxLength = getMaxLength(task.Slot) responseBody, err = cli.TaskLogs(ctx, opts.target, options) } else { + tty = service.Spec.TaskTemplate.ContainerSpec.TTY + // TODO(dperny) hot fix until we get a nice details system squared away, + // ignores details (including task context) if we have a TTY log + if tty { + options.Details = false + } + responseBody, err = cli.ServiceLogs(ctx, opts.target, options) if err != nil { return err @@ -112,6 +129,11 @@ func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error { } defer responseBody.Close() + if tty { + _, err = io.Copy(dockerCli.Out(), responseBody) + return err + } + taskFormatter := newTaskFormatter(cli, opts, maxLength) stdout := &logWriter{ctx: ctx, opts: opts, f: taskFormatter, w: dockerCli.Out()} diff --git a/daemon/cluster/executor/backend.go b/daemon/cluster/executor/backend.go index 5fe953ac05..acd018315b 100644 --- a/daemon/cluster/executor/backend.go +++ b/daemon/cluster/executor/backend.go @@ -33,7 +33,7 @@ type Backend interface { CreateManagedContainer(config types.ContainerCreateConfig) (container.ContainerCreateCreatedBody, error) ContainerStart(name string, hostConfig *container.HostConfig, checkpoint string, checkpointDir string) error ContainerStop(name string, seconds *int) error - ContainerLogs(context.Context, string, *backend.ContainerLogsConfig, chan struct{}) error + ContainerLogs(context.Context, string, *types.ContainerLogsOptions) (<-chan *backend.LogMessage, error) ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error ActivateContainerServiceBinding(containerName string) error DeactivateContainerServiceBinding(containerName string) error diff --git a/daemon/cluster/executor/container/adapter.go b/daemon/cluster/executor/container/adapter.go index 02ccf62c7e..1c669e68e2 100644 --- a/daemon/cluster/executor/container/adapter.go +++ b/daemon/cluster/executor/container/adapter.go @@ -396,26 +396,15 @@ func (c *containerAdapter) deactivateServiceBinding() error { return c.backend.DeactivateContainerServiceBinding(c.container.name()) } -func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (io.ReadCloser, error) { - // we can't handle the peculiarities of a TTY-attached container yet - conf := c.container.config() - if conf != nil && conf.Tty { - return nil, errors.New("logs not supported on containers with a TTY attached") - } +func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (<-chan *backend.LogMessage, error) { + apiOptions := &types.ContainerLogsOptions{ + Follow: options.Follow, - reader, writer := io.Pipe() - - apiOptions := &backend.ContainerLogsConfig{ - ContainerLogsOptions: types.ContainerLogsOptions{ - Follow: options.Follow, - - // TODO(stevvooe): Parse timestamp out of message. This - // absolutely needs to be done before going to production with - // this, at it is completely redundant. - Timestamps: true, - Details: false, // no clue what to do with this, let's just deprecate it. - }, - OutStream: writer, + // TODO(stevvooe): Parse timestamp out of message. This + // absolutely needs to be done before going to production with + // this, at it is completely redundant. + Timestamps: true, + Details: false, // no clue what to do with this, let's just deprecate it. } if options.Since != nil { @@ -449,14 +438,11 @@ func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscription } } } - - chStarted := make(chan struct{}) - go func() { - defer writer.Close() - c.backend.ContainerLogs(ctx, c.container.name(), apiOptions, chStarted) - }() - - return reader, nil + msgs, err := c.backend.ContainerLogs(ctx, c.container.name(), apiOptions) + if err != nil { + return nil, err + } + return msgs, nil } // todo: typed/wrapped errors diff --git a/daemon/cluster/executor/container/controller.go b/daemon/cluster/executor/container/controller.go index da09d2ee47..5c6f803509 100644 --- a/daemon/cluster/executor/container/controller.go +++ b/daemon/cluster/executor/container/controller.go @@ -1,11 +1,7 @@ package container import ( - "bufio" - "bytes" - "encoding/binary" "fmt" - "io" "os" "strconv" "strings" @@ -445,11 +441,12 @@ func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, opti return errors.Wrap(err, "container not ready for logs") } - rc, err := r.adapter.logs(ctx, options) + logsContext, cancel := context.WithCancel(ctx) + msgs, err := r.adapter.logs(logsContext, options) + defer cancel() if err != nil { return errors.Wrap(err, "failed getting container logs") } - defer rc.Close() var ( // use a rate limiter to keep things under control but also provides some @@ -462,53 +459,38 @@ func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, opti } ) - brd := bufio.NewReader(rc) for { - // so, message header is 8 bytes, treat as uint64, pull stream off MSB - var header uint64 - if err := binary.Read(brd, binary.BigEndian, &header); err != nil { - if err == io.EOF { - return nil - } - - return errors.Wrap(err, "failed reading log header") + msg, ok := <-msgs + if !ok { + // we're done here, no more messages + return nil } - stream, size := (header>>(7<<3))&0xFF, header & ^(uint64(0xFF)<<(7<<3)) + if msg.Err != nil { + // the defered cancel closes the adapter's log stream + return msg.Err + } - // limit here to decrease allocation back pressure. - if err := limiter.WaitN(ctx, int(size)); err != nil { + // wait here for the limiter to catch up + if err := limiter.WaitN(ctx, len(msg.Line)); err != nil { return errors.Wrap(err, "failed rate limiter") } - - buf := make([]byte, size) - _, err := io.ReadFull(brd, buf) - if err != nil { - return errors.Wrap(err, "failed reading buffer") - } - - // Timestamp is RFC3339Nano with 1 space after. Lop, parse, publish - parts := bytes.SplitN(buf, []byte(" "), 2) - if len(parts) != 2 { - return fmt.Errorf("invalid timestamp in log message: %v", buf) - } - - ts, err := time.Parse(time.RFC3339Nano, string(parts[0])) - if err != nil { - return errors.Wrap(err, "failed to parse timestamp") - } - - tsp, err := gogotypes.TimestampProto(ts) + tsp, err := gogotypes.TimestampProto(msg.Timestamp) if err != nil { return errors.Wrap(err, "failed to convert timestamp") } + var stream api.LogStream + if msg.Source == "stdout" { + stream = api.LogStreamStdout + } else if msg.Source == "stderr" { + stream = api.LogStreamStderr + } if err := publisher.Publish(ctx, api.LogMessage{ Context: msgctx, Timestamp: tsp, - Stream: api.LogStream(stream), - - Data: parts[1], + Stream: stream, + Data: msg.Line, }); err != nil { return errors.Wrap(err, "failed to publish log message") } diff --git a/daemon/cluster/services.go b/daemon/cluster/services.go index b12b60fcbd..8fd730eee7 100644 --- a/daemon/cluster/services.go +++ b/daemon/cluster/services.go @@ -18,9 +18,6 @@ import ( types "github.com/docker/docker/api/types/swarm" timetypes "github.com/docker/docker/api/types/time" "github.com/docker/docker/daemon/cluster/convert" - "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/pkg/ioutils" - "github.com/docker/docker/pkg/stdcopy" runconfigopts "github.com/docker/docker/runconfig/opts" swarmapi "github.com/docker/swarmkit/api" gogotypes "github.com/gogo/protobuf/types" @@ -303,56 +300,44 @@ func (c *Cluster) RemoveService(input string) error { } // ServiceLogs collects service logs and writes them back to `config.OutStream` -func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *backend.ContainerLogsConfig, started chan struct{}) error { +func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *apitypes.ContainerLogsOptions) (<-chan *backend.LogMessage, error) { c.mu.RLock() - defer func() { - select { - case <-started: - // if we've started streaming logs, we are no longer holding the - // lock and do not have to release it - return - default: - // before we start, though, we're holding this lock and it needs to - // be released - c.mu.RUnlock() - } - }() + defer c.mu.RUnlock() + state := c.currentNodeState() if !state.IsActiveManager() { - return c.errNoManager(state) + return nil, c.errNoManager(state) } - swarmSelector, tty, err := convertSelector(ctx, state.controlClient, selector) + swarmSelector, err := convertSelector(ctx, state.controlClient, selector) if err != nil { - return errors.Wrap(err, "error making log selector") - } - - // TODO(dperny) this goes away when we support TTY logs, which is in the works - if tty { - return errors.New("service logs not supported on tasks with a TTY attached") + return nil, errors.Wrap(err, "error making log selector") } // set the streams we'll use stdStreams := []swarmapi.LogStream{} - if config.ContainerLogsOptions.ShowStdout { + if config.ShowStdout { stdStreams = append(stdStreams, swarmapi.LogStreamStdout) } - if config.ContainerLogsOptions.ShowStderr { + if config.ShowStderr { stdStreams = append(stdStreams, swarmapi.LogStreamStderr) } // Get tail value squared away - the number of previous log lines we look at var tail int64 + // in ContainerLogs, if the tail value is ANYTHING non-integer, we just set + // it to -1 (all). i don't agree with that, but i also think no tail value + // should be legitimate. if you don't pass tail, we assume you want "all" if config.Tail == "all" || config.Tail == "" { // tail of 0 means send all logs on the swarmkit side tail = 0 } else { t, err := strconv.Atoi(config.Tail) if err != nil { - return errors.New("tail value must be a positive integer or \"all\"") + return nil, errors.New("tail value must be a positive integer or \"all\"") } if t < 0 { - return errors.New("negative tail values not supported") + return nil, errors.New("negative tail values not supported") } // we actually use negative tail in swarmkit to represent messages // backwards starting from the beginning. also, -1 means no logs. so, @@ -370,12 +355,12 @@ func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector if config.Since != "" { s, n, err := timetypes.ParseTimestamps(config.Since, 0) if err != nil { - return errors.Wrap(err, "could not parse since timestamp") + return nil, errors.Wrap(err, "could not parse since timestamp") } since := time.Unix(s, n) sinceProto, err = gogotypes.TimestampProto(since) if err != nil { - return errors.Wrap(err, "could not parse timestamp to proto") + return nil, errors.Wrap(err, "could not parse timestamp to proto") } } @@ -389,106 +374,96 @@ func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector }, }) if err != nil { - return err + return nil, err } - wf := ioutils.NewWriteFlusher(config.OutStream) - defer wf.Close() - - // Release the lock before starting the stream. - // - // this feels like it could be racy because we would double unlock if we - // somehow returned right after we unlocked but before we closed, but I do - // not think such a thing is possible. i wish it were possible to atomically - // close and unlock but that might be overkill. programming is hard. - c.mu.RUnlock() - close(started) - - wf.Flush() - - outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout) - errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr) - - for { - // Check the context before doing anything. - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - subscribeMsg, err := stream.Recv() - if err == io.EOF { - return nil - } - if err != nil { - return err - } - - for _, msg := range subscribeMsg.Messages { - data := []byte{} - - if config.Timestamps { - ts, err := gogotypes.TimestampFromProto(msg.Timestamp) - if err != nil { - return err + messageChan := make(chan *backend.LogMessage, 1) + go func() { + defer close(messageChan) + for { + // Check the context before doing anything. + select { + case <-ctx.Done(): + return + default: + } + subscribeMsg, err := stream.Recv() + if err == io.EOF { + return + } + // if we're not io.EOF, push the message in and return + if err != nil { + select { + case <-ctx.Done(): + case messageChan <- &backend.LogMessage{Err: err}: } - data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...) + return } - data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ", - contextPrefix, msg.Context.NodeID, - contextPrefix, msg.Context.ServiceID, - contextPrefix, msg.Context.TaskID, - ))...) + for _, msg := range subscribeMsg.Messages { + // make a new message + m := new(backend.LogMessage) + m.Attrs = make(backend.LogAttributes) + // add the timestamp, adding the error if it fails + m.Timestamp, err = gogotypes.TimestampFromProto(msg.Timestamp) + if err != nil { + m.Err = err + } + m.Attrs[contextPrefix+".node.id"] = msg.Context.NodeID + m.Attrs[contextPrefix+".service.id"] = msg.Context.ServiceID + m.Attrs[contextPrefix+".task.id"] = msg.Context.TaskID + switch msg.Stream { + case swarmapi.LogStreamStdout: + m.Source = "stdout" + case swarmapi.LogStreamStderr: + m.Source = "stderr" + } + m.Line = msg.Data - data = append(data, msg.Data...) - - switch msg.Stream { - case swarmapi.LogStreamStdout: - outStream.Write(data) - case swarmapi.LogStreamStderr: - errStream.Write(data) + // there could be a case where the reader stops accepting + // messages and the context is canceled. we need to check that + // here, or otherwise we risk blocking forever on the message + // send. + select { + case <-ctx.Done(): + return + case messageChan <- m: + } } } - } + }() + return messageChan, nil } // convertSelector takes a backend.LogSelector, which contains raw names that // may or may not be valid, and converts them to an api.LogSelector proto. It -// also returns a boolean, true if any of the services use a TTY (false -// otherwise) and an error if something fails -func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, bool, error) { - // if ANY tasks use a TTY, don't mux streams - var tty bool +// returns an error if something fails +func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, error) { // don't rely on swarmkit to resolve IDs, do it ourselves swarmSelector := &swarmapi.LogSelector{} for _, s := range selector.Services { service, err := getService(ctx, cc, s) if err != nil { - return nil, false, err + return nil, err } c := service.Spec.Task.GetContainer() if c == nil { - return nil, false, errors.New("logs only supported on container tasks") + return nil, errors.New("logs only supported on container tasks") } - // set TTY true if we have a TTY service, or if it's already true - tty = tty || c.TTY swarmSelector.ServiceIDs = append(swarmSelector.ServiceIDs, service.ID) } for _, t := range selector.Tasks { task, err := getTask(ctx, cc, t) if err != nil { - return nil, false, err + return nil, err } c := task.Spec.GetContainer() if c == nil { - return nil, false, errors.New("logs only supported on container tasks") + return nil, errors.New("logs only supported on container tasks") } - tty = tty || c.TTY swarmSelector.TaskIDs = append(swarmSelector.TaskIDs, task.ID) } - return swarmSelector, tty, nil + return swarmSelector, nil } // imageWithDigestString takes an image such as name or name:tag diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index 7172663aa0..1135195dc2 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -9,11 +9,10 @@ package logger import ( "errors" - "sort" - "strings" "sync" "time" + "github.com/docker/docker/api/types/backend" "github.com/docker/docker/pkg/jsonlog" ) @@ -43,14 +42,13 @@ func PutMessage(msg *Message) { // Message is datastructure that represents piece of output produced by some // container. The Line member is a slice of an array whose contents can be // changed after a log driver's Log() method returns. +// +// Message is subtyped from backend.LogMessage because there is a lot of +// internal complexity around the Message type that should not be exposed +// to any package not explicitly importing the logger type. +// // Any changes made to this struct must also be updated in the `reset` function -type Message struct { - Line []byte - Source string - Timestamp time.Time - Attrs LogAttributes - Partial bool -} +type Message backend.LogMessage // reset sets the message back to default values // This is used when putting a message back into the message pool. @@ -60,31 +58,20 @@ func (m *Message) reset() { m.Source = "" m.Attrs = nil m.Partial = false + + m.Err = nil +} + +// AsLogMessage returns a pointer to the message as a pointer to +// backend.LogMessage, which is an identical type with a different purpose +func (m *Message) AsLogMessage() *backend.LogMessage { + return (*backend.LogMessage)(m) } // LogAttributes is used to hold the extra attributes available in the log message // Primarily used for converting the map type to string and sorting. -type LogAttributes map[string]string -type byKey []string - -func (s byKey) Len() int { return len(s) } -func (s byKey) Less(i, j int) bool { - keyI := strings.Split(s[i], "=") - keyJ := strings.Split(s[j], "=") - return keyI[0] < keyJ[0] -} -func (s byKey) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} - -func (a LogAttributes) String() string { - var ss byKey - for k, v := range a { - ss = append(ss, k+"="+v) - } - sort.Sort(ss) - return strings.Join(ss, ",") -} +// Imported here so it can be used internally with less refactoring +type LogAttributes backend.LogAttributes // Logger is the interface for docker logging drivers. type Logger interface { diff --git a/daemon/logs.go b/daemon/logs.go index b1b0bbb910..b207fb693e 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -2,48 +2,57 @@ package daemon import ( "errors" - "io" "strconv" "time" "golang.org/x/net/context" "github.com/Sirupsen/logrus" + "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/backend" containertypes "github.com/docker/docker/api/types/container" timetypes "github.com/docker/docker/api/types/time" "github.com/docker/docker/container" "github.com/docker/docker/daemon/logger" - "github.com/docker/docker/pkg/ioutils" - "github.com/docker/docker/pkg/stdcopy" ) -// ContainerLogs hooks up a container's stdout and stderr streams -// configured with the given struct. -func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *backend.ContainerLogsConfig, started chan struct{}) error { +// ContainerLogs copies the container's log channel to the channel provided in +// the config. If ContainerLogs returns an error, no messages have been copied. +// and the channel will be closed without data. +// +// if it returns nil, the config channel will be active and return log +// messages until it runs out or the context is canceled. +func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *types.ContainerLogsOptions) (<-chan *backend.LogMessage, error) { + lg := logrus.WithFields(logrus.Fields{ + "module": "daemon", + "method": "(*Daemon).ContainerLogs", + "container": containerName, + }) + if !(config.ShowStdout || config.ShowStderr) { - return errors.New("You must choose at least one stream") + return nil, errors.New("You must choose at least one stream") } container, err := daemon.GetContainer(containerName) if err != nil { - return err + return nil, err } if container.RemovalInProgress || container.Dead { - return errors.New("can not get logs from container which is dead or marked for removal") + return nil, errors.New("can not get logs from container which is dead or marked for removal") } if container.HostConfig.LogConfig.Type == "none" { - return logger.ErrReadLogsNotSupported + return nil, logger.ErrReadLogsNotSupported } cLog, err := daemon.getLogger(container) if err != nil { - return err + return nil, err } + logReader, ok := cLog.(logger.LogReader) if !ok { - return logger.ErrReadLogsNotSupported + return nil, logger.ErrReadLogsNotSupported } follow := config.Follow && container.IsRunning() @@ -52,76 +61,91 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c tailLines = -1 } - logrus.Debug("logs: begin stream") - var since time.Time if config.Since != "" { s, n, err := timetypes.ParseTimestamps(config.Since, 0) if err != nil { - return err + return nil, err } since = time.Unix(s, n) } + readConfig := logger.ReadConfig{ Since: since, Tail: tailLines, Follow: follow, } + logs := logReader.ReadLogs(readConfig) - // Close logWatcher on exit - defer func() { - logs.Close() - if cLog != container.LogDriver { - // Since the logger isn't cached in the container, which - // occurs if it is running, it must get explicitly closed - // here to avoid leaking it and any file handles it has. - if err := cLog.Close(); err != nil { - logrus.Errorf("Error closing logger: %v", err) + + // past this point, we can't possibly return any errors, so we can just + // start a goroutine and return to tell the caller not to expect errors + // (if the caller wants to give up on logs, they have to cancel the context) + // this goroutine functions as a shim between the logger and the caller. + messageChan := make(chan *backend.LogMessage, 1) + go func() { + // set up some defers + defer func() { + // ok so this function, originally, was placed right after that + // logger.ReadLogs call above. I THINK that means it sets off the + // chain of events that results in the logger needing to be closed. + // i do not know if an error in time parsing above causing an early + // return will result in leaking the logger. if that is the case, + // it would also have been a bug in the original code + logs.Close() + if cLog != container.LogDriver { + // Since the logger isn't cached in the container, which + // occurs if it is running, it must get explicitly closed + // here to avoid leaking it and any file handles it has. + if err := cLog.Close(); err != nil { + logrus.Errorf("Error closing logger: %v", err) + } + } + }() + // close the messages channel. closing is the only way to signal above + // that we're doing with logs (other than context cancel i guess). + defer close(messageChan) + + lg.Debug("begin logs") + for { + select { + // i do not believe as the system is currently designed any error + // is possible, but we should be prepared to handle it anyway. if + // we do get an error, copy only the error field to a new object so + // we don't end up with partial data in the other fields + case err := <-logs.Err: + lg.Errorf("Error streaming logs: %v", err) + select { + case <-ctx.Done(): + case messageChan <- &backend.LogMessage{Err: err}: + } + return + case <-ctx.Done(): + lg.Debug("logs: end stream, ctx is done: %v", ctx.Err()) + return + case msg, ok := <-logs.Msg: + // there is some kind of pool or ring buffer in the logger that + // produces these messages, and a possible future optimization + // might be to use that pool and reuse message objects + if !ok { + lg.Debug("end logs") + return + } + m := msg.AsLogMessage() // just a pointer conversion, does not copy data + + // there could be a case where the reader stops accepting + // messages and the context is canceled. we need to check that + // here, or otherwise we risk blocking forever on the message + // send. + select { + case <-ctx.Done(): + return + case messageChan <- m: + } } } }() - - wf := ioutils.NewWriteFlusher(config.OutStream) - defer wf.Close() - close(started) - wf.Flush() - - var outStream io.Writer - outStream = wf - errStream := outStream - if !container.Config.Tty { - errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr) - outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout) - } - - for { - select { - case err := <-logs.Err: - logrus.Errorf("Error streaming logs: %v", err) - return nil - case <-ctx.Done(): - logrus.Debugf("logs: end stream, ctx is done: %v", ctx.Err()) - return nil - case msg, ok := <-logs.Msg: - if !ok { - logrus.Debug("logs: end stream") - return nil - } - logLine := msg.Line - if config.Details { - logLine = append([]byte(msg.Attrs.String()+" "), logLine...) - } - if config.Timestamps { - logLine = append([]byte(msg.Timestamp.Format(logger.TimeFormat)+" "), logLine...) - } - if msg.Source == "stdout" && config.ShowStdout { - outStream.Write(logLine) - } - if msg.Source == "stderr" && config.ShowStderr { - errStream.Write(logLine) - } - } - } + return messageChan, nil } func (daemon *Daemon) getLogger(container *container.Container) (logger.Logger, error) { diff --git a/integration-cli/docker_cli_service_logs_experimental_test.go b/integration-cli/docker_cli_service_logs_experimental_test.go index 80a3b277b1..96bb48bfe1 100644 --- a/integration-cli/docker_cli_service_logs_experimental_test.go +++ b/integration-cli/docker_cli_service_logs_experimental_test.go @@ -254,3 +254,43 @@ func (s *DockerSwarmSuite) TestServiceLogsTaskLogs(c *check.C) { } } } + +func (s *DockerSwarmSuite) TestServiceLogsTTY(c *check.C) { + testRequires(c, ExperimentalDaemon) + + d := s.AddDaemon(c, true, true) + + name := "TestServiceLogsTTY" + + result := icmd.RunCmd(d.Command( + // create a service + "service", "create", + // name it $name + "--name", name, + // use a TTY + "-t", + // busybox image, shell string + "busybox", "sh", "-c", + // echo to stdout and stderr + "echo out; (echo err 1>&2); sleep 10000", + )) + + result.Assert(c, icmd.Expected{}) + id := strings.TrimSpace(result.Stdout()) + c.Assert(id, checker.Not(checker.Equals), "") + // so, right here, we're basically inspecting by id and returning only + // the ID. if they don't match, the service doesn't exist. + result = icmd.RunCmd(d.Command("service", "inspect", "--format=\"{{.ID}}\"", id)) + result.Assert(c, icmd.Expected{Out: id}) + + // make sure task has been deployed. + waitAndAssert(c, defaultReconciliationTimeout, d.CheckActiveContainerCount, checker.Equals, 1) + // and make sure we have all the log lines + waitAndAssert(c, defaultReconciliationTimeout, countLogLines(d, name), checker.Equals, 2) + + cmd := d.Command("service", "logs", name) + result = icmd.RunCmd(cmd) + // for some reason there is carriage return in the output. i think this is + // just expected. + c.Assert(result, icmd.Matches, icmd.Expected{Out: "out\r\nerr\r\n"}) +}